From 98a5bca16007a7c4740c4326ef83768d034aed3e Mon Sep 17 00:00:00 2001 From: José Bollo Date: Wed, 3 Aug 2016 20:04:08 +0200 Subject: Threads: handles request with threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implementation handles all requests with threads. Later implementation could add a mechanism to choose what request will be handled by threads. Each API receive its requests in serial order without reentrancy. Here again, this can change in the future if a choice is possible to allow reentrant calls. The signal/event are not processed using threads in this version. It may change in the future. Signed-off-by: José Bollo --- src/CMakeLists.txt | 1 + src/afb-api-so.c | 51 +++---- src/afb-sig-handler.c | 63 ++++---- src/afb-sig-handler.h | 4 + src/afb-thread.c | 378 +++++++++++++++++++++++++++++++++++++++++++++++ src/afb-thread.h | 31 ++++ src/main.c | 10 +- src/tests/test-thread.c | 99 +++++++++++++ src/tests/test-thread.sh | 4 + 9 files changed, 582 insertions(+), 59 deletions(-) create mode 100644 src/afb-thread.c create mode 100644 src/afb-thread.h create mode 100644 src/tests/test-thread.c create mode 100755 src/tests/test-thread.sh diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9c13cc18..94e0a930 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -48,6 +48,7 @@ ADD_LIBRARY(afb-lib STATIC afb-sig-handler.c afb-svc.c afb-subcall.c + afb-thread.c afb-websock.c afb-ws-client.c afb-ws-json1.c diff --git a/src/afb-api-so.c b/src/afb-api-so.c index ef1bbcc2..4bb6087f 100644 --- a/src/afb-api-so.c +++ b/src/afb-api-so.c @@ -38,6 +38,7 @@ #include "afb-apis.h" #include "afb-api-so.h" #include "afb-sig-handler.h" +#include "afb-thread.h" #include "afb-evt.h" #include "afb-svc.h" #include "verbose.h" @@ -139,33 +140,22 @@ static int afb_api_so_rootdir_open_locale(void *closure, const char *filename, i return afb_common_rootdir_open_locale(filename, flags, locale); } -static void monitored_call(int signum, void *arg) +static int call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb) { - struct monitoring *data = arg; - if (signum != 0) - afb_req_fail_f(data->req, "aborted", "signal %s(%d) caught", strsignal(signum), signum); - else - data->action(data->req); -} - -static void call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb) -{ - struct monitoring data; - int stag = (int)verb->session; if ((stag & (AFB_SESSION_CREATE|AFB_SESSION_CLOSE|AFB_SESSION_RENEW|AFB_SESSION_CHECK|AFB_SESSION_LOA_EQ)) != 0) { if (!afb_context_check(context)) { afb_context_close(context); afb_req_fail(req, "failed", "invalid token's identity"); - return; + return 0; } } if ((stag & AFB_SESSION_CREATE) != 0) { if (afb_context_check_loa(context, 1)) { afb_req_fail(req, "failed", "invalid creation state"); - return; + return 0; } afb_context_change_loa(context, 1); afb_context_refresh(context); @@ -183,7 +173,7 @@ static void call_check(struct afb_req req, struct afb_context *context, const st int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK; if (!afb_context_check_loa(context, loa)) { afb_req_fail(req, "failed", "invalid LOA"); - return; + return 0; } } @@ -191,27 +181,30 @@ static void call_check(struct afb_req req, struct afb_context *context, const st int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK; if (afb_context_check_loa(context, loa + 1)) { afb_req_fail(req, "failed", "invalid LOA"); - return; + return 0; } } - - data.req = req; - data.action = verb->callback; - afb_sig_monitor(monitored_call, &data, api_timeout); + return 1; } -static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *verb, size_t lenverb) +static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *strverb, size_t lenverb) { - const struct afb_verb_desc_v1 *v; + const struct afb_verb_desc_v1 *verb; struct api_so_desc *desc = closure; - v = desc->binding->v1.verbs; - while (v->name && (strncasecmp(v->name, verb, lenverb) || v->name[lenverb])) - v++; - if (v->name) - call_check(req, context, v); - else - afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, verb, desc->binding->v1.prefix); + verb = desc->binding->v1.verbs; + while (verb->name && (strncasecmp(verb->name, strverb, lenverb) || verb->name[lenverb])) + verb++; + if (!verb->name) + afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix); + else if (call_check(req, context, verb)) { + if (0) + /* not threaded */ + afb_sig_req_timeout(req, verb->callback, api_timeout); + else + /* threaded */ + afb_thread_call(req, verb->callback, api_timeout, desc); + } } static int service_start_cb(void *closure, int share_session, int onneed) diff --git a/src/afb-sig-handler.c b/src/afb-sig-handler.c index ad56392c..25a54378 100644 --- a/src/afb-sig-handler.c +++ b/src/afb-sig-handler.c @@ -21,12 +21,12 @@ #include #include #include -#include -#include -#include #include +#include + #include "afb-sig-handler.h" +#include "afb-thread.h" #include "verbose.h" static _Thread_local sigjmp_buf *error_handler; @@ -75,15 +75,39 @@ int afb_sig_handler_init() return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1; } +int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req)) +{ + volatile int signum; + sigjmp_buf jmpbuf, *older; + + older = error_handler; + signum = setjmp(jmpbuf); + if (signum != 0) + afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum); + else { + error_handler = &jmpbuf; + callback(req); + } + error_handler = older; + return signum; +} + +int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout) +{ + int rc; + + if (timeout) + afb_thread_timer_arm(timeout); + rc = afb_sig_req(req, callback); + afb_thread_timer_disarm(); + return rc; +} + void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout) { - volatile int signum, timerset; - timer_t timerid; + volatile int signum; sigjmp_buf jmpbuf, *older; - struct sigevent sevp; - struct itimerspec its; - timerset = 0; older = error_handler; signum = setjmp(jmpbuf); if (signum != 0) { @@ -91,28 +115,11 @@ void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeou } else { error_handler = &jmpbuf; - if (timeout > 0) { - timerset = 1; /* TODO: check statuses */ - sevp.sigev_notify = SIGEV_THREAD_ID; - sevp.sigev_signo = SIGALRM; - sevp.sigev_value.sival_ptr = NULL; -#if defined(sigev_notify_thread_id) - sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid); -#else - sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid); -#endif - timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &timerid); - its.it_interval.tv_sec = 0; - its.it_interval.tv_nsec = 0; - its.it_value.tv_sec = timeout; - its.it_value.tv_nsec = 0; - timer_settime(timerid, 0, &its, NULL); - } - + if (timeout) + afb_thread_timer_arm(timeout); function(0, closure); } - if (timerset) - timer_delete(timerid); + afb_thread_timer_disarm(); error_handler = older; } diff --git a/src/afb-sig-handler.h b/src/afb-sig-handler.h index 3c9626cb..6fa000e9 100644 --- a/src/afb-sig-handler.h +++ b/src/afb-sig-handler.h @@ -18,7 +18,11 @@ #pragma once +struct afb_req; + extern int afb_sig_handler_init(); extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout); +extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req)); +extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout); diff --git a/src/afb-thread.c b/src/afb-thread.c new file mode 100644 index 00000000..5ce09300 --- /dev/null +++ b/src/afb-thread.c @@ -0,0 +1,378 @@ +/* + * Copyright (C) 2016 "IoT.bzh" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "afb-thread.h" +#include "afb-sig-handler.h" +#include "verbose.h" + +/* control of threads */ +struct thread +{ + pthread_t tid; /* the thread id */ + unsigned stop: 1; /* stop request */ + unsigned ended: 1; /* ended status */ + unsigned works: 1; /* is it processing a job? */ +}; + +/* describes pending job */ +struct job +{ + void (*callback)(struct afb_req req); /* processing callback */ + struct afb_req req; /* request to be processed */ + int timeout; /* timeout in second for processing the request */ + int blocked; /* is an other request blocking this one ? */ + void *group; /* group of the request */ + struct job *next; /* link to the next job enqueued */ +}; + +/* synchronisation of threads */ +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + +/* queue of pending jobs */ +static struct job *first_job = NULL; + +/* count allowed, started and running threads */ +static int allowed = 0; +static int started = 0; +static int running = 0; +static int remains = 0; + +/* list of threads */ +static struct thread *threads = NULL; + +/* local timers */ +static _Thread_local int thread_timer_set; +static _Thread_local timer_t thread_timerid; + +/* + * Creates a timer for the current thread + * + * Returns 0 in case of success + */ +int afb_thread_timer_create() +{ + int rc; + struct sigevent sevp; + + if (thread_timer_set) + rc = 0; + else { + sevp.sigev_notify = SIGEV_THREAD_ID; + sevp.sigev_signo = SIGALRM; + sevp.sigev_value.sival_ptr = NULL; +#if defined(sigev_notify_thread_id) + sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid); +#else + sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid); +#endif + rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid); + thread_timer_set = !rc; + } + return 0; +} + +/* + * Arms the alarm in timeout seconds for the current thread + */ +int afb_thread_timer_arm(int timeout) +{ + int rc; + struct itimerspec its; + + rc = afb_thread_timer_create(); + if (rc == 0) { + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + its.it_value.tv_sec = timeout; + its.it_value.tv_nsec = 0; + rc = timer_settime(thread_timerid, 0, &its, NULL); + } + + return rc; +} + +/* + * Disarms the current alarm + */ +void afb_thread_timer_disarm() +{ + if (thread_timer_set) + afb_thread_timer_arm(0); +} + +/* + * Delstroy any alarm resource for the current thread + */ +void afb_thread_timer_delete() +{ + if (thread_timer_set) { + timer_delete(thread_timerid); + thread_timer_set = 0; + } +} + +/* add the job to the list */ +static inline void job_add(struct job *job) +{ + void *group = job->group; + struct job *ijob, **pjob; + + pjob = &first_job; + ijob = first_job; + group = job->group; + if (group == NULL) + group = job; + while (ijob) { + if (ijob->group == group) + job->blocked = 1; + pjob = &ijob->next; + ijob = ijob->next; + } + *pjob = job; + job->next = NULL; + remains--; +} + +/* get the next job to process or NULL if none */ +static inline struct job *job_get() +{ + struct job *job, **pjob; + pjob = &first_job; + job = first_job; + while (job && job->blocked) { + pjob = &job->next; + job = job->next; + } + if (job) { + *pjob = job->next; + remains++; + } + return job; +} + +/* unblock a group of job */ +static inline void job_unblock(void *group) +{ + struct job *job; + + job = first_job; + while (job) { + if (job->group == group) { + job->blocked = 0; + break; + } + job = job->next; + } +} + +/* main loop of processing threads */ +static void *thread_main_loop(void *data) +{ + struct thread *me = data; + struct job *job, j; + + me->works = 0; + me->ended = 0; + afb_thread_timer_create(); + pthread_mutex_lock(&mutex); + while (!me->stop) { + /* get a job */ + job = job_get(); + if (job == NULL && first_job != NULL && running == 0) { + /* sad situation!! should not happen */ + ERROR("threads are blocked!"); + job = first_job; + first_job = job->next; + } + if (job == NULL) { + /* no job... */ + pthread_cond_wait(&cond, &mutex); + } else { + /* run the job */ + running++; + me->works = 1; + pthread_mutex_unlock(&mutex); + j = *job; + free(job); + afb_thread_timer_arm(j.timeout); + afb_sig_req(j.req, j.callback); + afb_thread_timer_disarm(); + afb_req_unref(j.req); + pthread_mutex_lock(&mutex); + if (j.group != NULL) + job_unblock(j.group); + me->works = 0; + running--; + } + + } + me->ended = 1; + pthread_mutex_unlock(&mutex); + afb_thread_timer_delete(); + return me; +} + +/* start a new thread */ +static int start_one_thread() +{ + struct thread *t; + int rc; + + assert(started < allowed); + + t = &threads[started++]; + t->stop = 0; + rc = pthread_create(&t->tid, NULL, thread_main_loop, t); + if (rc != 0) { + started--; + errno = rc; + WARNING("not able to start thread: %m"); + rc = -1; + } + return rc; +} + +/* process the 'request' with the 'callback' using a separate thread if available */ +void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group) +{ + const char *info; + struct job *job; + int rc; + + /* allocates the job */ + job = malloc(sizeof *job); + if (job == NULL) { + info = "out of memory"; + goto error; + } + + /* start a thread if needed */ + pthread_mutex_lock(&mutex); + if (remains == 0) { + info = "too many jobs"; + goto error2; + } + if (started == running && started < allowed) { + rc = start_one_thread(); + if (rc < 0 && started == 0) { + /* failed to start threading */ + info = "can't start thread"; + goto error2; + } + } + + /* fills and queues the job */ + job->callback = callback; + job->req = req; + job->timeout = timeout; + job->group = group; + afb_req_addref(req); + job_add(job); + pthread_mutex_unlock(&mutex); + + /* signal an existing job */ + pthread_cond_signal(&cond); + return; + +error2: + pthread_mutex_unlock(&mutex); + free(job); +error: + ERROR("can't process job with threads: %s", info); + afb_req_fail(req, "internal-error", info); +} + +/* initialise the threads */ +int afb_thread_init(int allowed_count, int start_count, int waiter_count) +{ + threads = calloc(allowed_count, sizeof *threads); + if (threads == NULL) { + errno = ENOMEM; + ERROR("can't allocate threads"); + return -1; + } + + /* records the allowed count */ + allowed = allowed_count; + started = 0; + running = 0; + remains = waiter_count; + + /* start at least one thread */ + pthread_mutex_lock(&mutex); + while (started < start_count && start_one_thread() == 0); + pthread_mutex_unlock(&mutex); + + /* end */ + return -(started != start_count); +} + +/* terminate all the threads and all pending requests */ +void afb_thread_terminate() +{ + int i, n; + struct job *job; + + /* request all threads to stop */ + pthread_mutex_lock(&mutex); + allowed = 0; + n = started; + for (i = 0 ; i < n ; i++) + threads[i].stop = 1; + + /* wait until all thread are terminated */ + while (started != 0) { + /* signal threads */ + pthread_mutex_unlock(&mutex); + pthread_cond_broadcast(&cond); + pthread_mutex_lock(&mutex); + + /* join the terminated threads */ + for (i = 0 ; i < n ; i++) { + if (threads[i].tid && threads[i].ended) { + pthread_join(threads[i].tid, NULL); + threads[i].tid = 0; + started--; + } + } + } + pthread_mutex_unlock(&mutex); + free(threads); + + /* cancel pending jobs */ + while (first_job) { + job = first_job; + first_job = job->next; + afb_req_fail(job->req, "aborted", "termination of threading"); + afb_req_unref(job->req); + free(job); + } +} diff --git a/src/afb-thread.h b/src/afb-thread.h new file mode 100644 index 00000000..e25f90ec --- /dev/null +++ b/src/afb-thread.h @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2016 "IoT.bzh" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +struct afb_req; + +extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group); + +extern int afb_thread_init(int allowed_count, int start_count, int waiter_count); +extern void afb_thread_terminate(); + +extern int afb_thread_timer_create(); +extern int afb_thread_timer_arm(int timeout); +extern void afb_thread_timer_disarm(); +extern void afb_thread_timer_delete(); + diff --git a/src/main.c b/src/main.c index 5871d125..38598189 100644 --- a/src/main.c +++ b/src/main.c @@ -39,6 +39,7 @@ #include "afb-context.h" #include "afb-hreq.h" #include "afb-sig-handler.h" +#include "afb-thread.h" #include "session.h" #include "verbose.h" #include "afb-common.h" @@ -650,12 +651,17 @@ int main(int argc, char *argv[]) { } if (afb_sig_handler_init() < 0) { - ERROR("main fail to initialise signal handlers"); + ERROR("failed to initialise signal handlers"); return 1; } if (afb_common_rootdir_set(config->rootdir) < 0) { - ERROR("main fail to set common root directory"); + ERROR("failed to set common root directory"); + return 1; + } + + if (afb_thread_init(3, 1, 20) < 0) { + ERROR("failed to initialise threading"); return 1; } diff --git a/src/tests/test-thread.c b/src/tests/test-thread.c new file mode 100644 index 00000000..cc676bd0 --- /dev/null +++ b/src/tests/test-thread.c @@ -0,0 +1,99 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +#include +#include "../afb-thread.h" + +struct foo { + int value; + int refcount; +}; + +void addref(void *closure) +{ + struct foo *foo = closure; + foo->refcount++; +} + +void unref(void *closure) +{ + struct foo *foo = closure; + if(!--foo->refcount) { + printf("%06d FREE\n", foo->value); + free(foo); + } +} + +void fail(void *closure, const char *status, const char *info) +{ + struct foo *foo = closure; + printf("%06d ERROR %s\n", foo->value, status); +} + +struct afb_req_itf itf = { + .json = NULL, + .get = NULL, + + .success = NULL, + .fail = fail, + + .raw = NULL, + .send = NULL, + + .context_get = NULL, + .context_set = NULL, + + .addref = addref, + .unref = unref, + + .session_close = NULL, + .session_set_LOA = NULL, + + .subscribe = NULL, + .unsubscribe = NULL, + + .subcall = NULL +}; + +void process(struct afb_req req) +{ + struct timespec ts; + struct foo *foo = req.closure; + printf("%06d PROCESS T%d\n", foo->value, (int)syscall(SYS_gettid)); + ts.tv_sec = 0; + ts.tv_nsec = foo->value * 1000; +// nanosleep(&ts, NULL); +} + +int main() +{ + int i; + struct foo *foo; + struct afb_req req; + struct timespec ts; + + req.itf = &itf; + afb_thread_init(4, 1); + for (i = 0 ; i < 10000 ; i++) { + req.closure = foo = malloc(sizeof *foo); + foo->value = i; + foo->refcount = 1; + afb_thread_call(req, process, 5, (&ts) + (i % 4)); + unref(foo); + ts.tv_sec = 0; + ts.tv_nsec = 1000000; +// nanosleep(&ts, NULL); + } + ts.tv_sec = 1; + ts.tv_nsec = 0; + nanosleep(&ts, NULL); + afb_thread_terminate(); +} + + + + diff --git a/src/tests/test-thread.sh b/src/tests/test-thread.sh new file mode 100755 index 00000000..6d1b3a53 --- /dev/null +++ b/src/tests/test-thread.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +cc test-thread.c ../afb-thread.c ../verbose.c ../afb-sig-handler.c -o test-thread -lrt -lpthread -I../../include +./test-thread -- cgit 1.2.3-korg