summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/afb-api-so.c51
-rw-r--r--src/afb-sig-handler.c63
-rw-r--r--src/afb-sig-handler.h4
-rw-r--r--src/afb-thread.c378
-rw-r--r--src/afb-thread.h31
-rw-r--r--src/main.c10
-rw-r--r--src/tests/test-thread.c99
-rwxr-xr-xsrc/tests/test-thread.sh4
9 files changed, 582 insertions, 59 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 9c13cc18..94e0a930 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -48,6 +48,7 @@ ADD_LIBRARY(afb-lib STATIC
afb-sig-handler.c
afb-svc.c
afb-subcall.c
+ afb-thread.c
afb-websock.c
afb-ws-client.c
afb-ws-json1.c
diff --git a/src/afb-api-so.c b/src/afb-api-so.c
index ef1bbcc2..4bb6087f 100644
--- a/src/afb-api-so.c
+++ b/src/afb-api-so.c
@@ -38,6 +38,7 @@
#include "afb-apis.h"
#include "afb-api-so.h"
#include "afb-sig-handler.h"
+#include "afb-thread.h"
#include "afb-evt.h"
#include "afb-svc.h"
#include "verbose.h"
@@ -139,33 +140,22 @@ static int afb_api_so_rootdir_open_locale(void *closure, const char *filename, i
return afb_common_rootdir_open_locale(filename, flags, locale);
}
-static void monitored_call(int signum, void *arg)
+static int call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb)
{
- struct monitoring *data = arg;
- if (signum != 0)
- afb_req_fail_f(data->req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
- else
- data->action(data->req);
-}
-
-static void call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb)
-{
- struct monitoring data;
-
int stag = (int)verb->session;
if ((stag & (AFB_SESSION_CREATE|AFB_SESSION_CLOSE|AFB_SESSION_RENEW|AFB_SESSION_CHECK|AFB_SESSION_LOA_EQ)) != 0) {
if (!afb_context_check(context)) {
afb_context_close(context);
afb_req_fail(req, "failed", "invalid token's identity");
- return;
+ return 0;
}
}
if ((stag & AFB_SESSION_CREATE) != 0) {
if (afb_context_check_loa(context, 1)) {
afb_req_fail(req, "failed", "invalid creation state");
- return;
+ return 0;
}
afb_context_change_loa(context, 1);
afb_context_refresh(context);
@@ -183,7 +173,7 @@ static void call_check(struct afb_req req, struct afb_context *context, const st
int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
if (!afb_context_check_loa(context, loa)) {
afb_req_fail(req, "failed", "invalid LOA");
- return;
+ return 0;
}
}
@@ -191,27 +181,30 @@ static void call_check(struct afb_req req, struct afb_context *context, const st
int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
if (afb_context_check_loa(context, loa + 1)) {
afb_req_fail(req, "failed", "invalid LOA");
- return;
+ return 0;
}
}
-
- data.req = req;
- data.action = verb->callback;
- afb_sig_monitor(monitored_call, &data, api_timeout);
+ return 1;
}
-static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *verb, size_t lenverb)
+static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *strverb, size_t lenverb)
{
- const struct afb_verb_desc_v1 *v;
+ const struct afb_verb_desc_v1 *verb;
struct api_so_desc *desc = closure;
- v = desc->binding->v1.verbs;
- while (v->name && (strncasecmp(v->name, verb, lenverb) || v->name[lenverb]))
- v++;
- if (v->name)
- call_check(req, context, v);
- else
- afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, verb, desc->binding->v1.prefix);
+ verb = desc->binding->v1.verbs;
+ while (verb->name && (strncasecmp(verb->name, strverb, lenverb) || verb->name[lenverb]))
+ verb++;
+ if (!verb->name)
+ afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix);
+ else if (call_check(req, context, verb)) {
+ if (0)
+ /* not threaded */
+ afb_sig_req_timeout(req, verb->callback, api_timeout);
+ else
+ /* threaded */
+ afb_thread_call(req, verb->callback, api_timeout, desc);
+ }
}
static int service_start_cb(void *closure, int share_session, int onneed)
diff --git a/src/afb-sig-handler.c b/src/afb-sig-handler.c
index ad56392c..25a54378 100644
--- a/src/afb-sig-handler.c
+++ b/src/afb-sig-handler.c
@@ -21,12 +21,12 @@
#include <stdlib.h>
#include <signal.h>
#include <string.h>
-#include <unistd.h>
-#include <time.h>
-#include <sys/syscall.h>
#include <setjmp.h>
+#include <afb/afb-req-itf.h>
+
#include "afb-sig-handler.h"
+#include "afb-thread.h"
#include "verbose.h"
static _Thread_local sigjmp_buf *error_handler;
@@ -75,15 +75,39 @@ int afb_sig_handler_init()
return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
}
+int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req))
+{
+ volatile int signum;
+ sigjmp_buf jmpbuf, *older;
+
+ older = error_handler;
+ signum = setjmp(jmpbuf);
+ if (signum != 0)
+ afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
+ else {
+ error_handler = &jmpbuf;
+ callback(req);
+ }
+ error_handler = older;
+ return signum;
+}
+
+int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout)
+{
+ int rc;
+
+ if (timeout)
+ afb_thread_timer_arm(timeout);
+ rc = afb_sig_req(req, callback);
+ afb_thread_timer_disarm();
+ return rc;
+}
+
void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout)
{
- volatile int signum, timerset;
- timer_t timerid;
+ volatile int signum;
sigjmp_buf jmpbuf, *older;
- struct sigevent sevp;
- struct itimerspec its;
- timerset = 0;
older = error_handler;
signum = setjmp(jmpbuf);
if (signum != 0) {
@@ -91,28 +115,11 @@ void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeou
}
else {
error_handler = &jmpbuf;
- if (timeout > 0) {
- timerset = 1; /* TODO: check statuses */
- sevp.sigev_notify = SIGEV_THREAD_ID;
- sevp.sigev_signo = SIGALRM;
- sevp.sigev_value.sival_ptr = NULL;
-#if defined(sigev_notify_thread_id)
- sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
-#else
- sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
-#endif
- timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &timerid);
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = timeout;
- its.it_value.tv_nsec = 0;
- timer_settime(timerid, 0, &its, NULL);
- }
-
+ if (timeout)
+ afb_thread_timer_arm(timeout);
function(0, closure);
}
- if (timerset)
- timer_delete(timerid);
+ afb_thread_timer_disarm();
error_handler = older;
}
diff --git a/src/afb-sig-handler.h b/src/afb-sig-handler.h
index 3c9626cb..6fa000e9 100644
--- a/src/afb-sig-handler.h
+++ b/src/afb-sig-handler.h
@@ -18,7 +18,11 @@
#pragma once
+struct afb_req;
+
extern int afb_sig_handler_init();
extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout);
+extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req));
+extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout);
diff --git a/src/afb-thread.c b/src/afb-thread.c
new file mode 100644
index 00000000..5ce09300
--- /dev/null
+++ b/src/afb-thread.c
@@ -0,0 +1,378 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <afb/afb-req-itf.h>
+
+#include "afb-thread.h"
+#include "afb-sig-handler.h"
+#include "verbose.h"
+
+/* control of threads */
+struct thread
+{
+ pthread_t tid; /* the thread id */
+ unsigned stop: 1; /* stop request */
+ unsigned ended: 1; /* ended status */
+ unsigned works: 1; /* is it processing a job? */
+};
+
+/* describes pending job */
+struct job
+{
+ void (*callback)(struct afb_req req); /* processing callback */
+ struct afb_req req; /* request to be processed */
+ int timeout; /* timeout in second for processing the request */
+ int blocked; /* is an other request blocking this one ? */
+ void *group; /* group of the request */
+ struct job *next; /* link to the next job enqueued */
+};
+
+/* synchronisation of threads */
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
+/* queue of pending jobs */
+static struct job *first_job = NULL;
+
+/* count allowed, started and running threads */
+static int allowed = 0;
+static int started = 0;
+static int running = 0;
+static int remains = 0;
+
+/* list of threads */
+static struct thread *threads = NULL;
+
+/* local timers */
+static _Thread_local int thread_timer_set;
+static _Thread_local timer_t thread_timerid;
+
+/*
+ * Creates a timer for the current thread
+ *
+ * Returns 0 in case of success
+ */
+int afb_thread_timer_create()
+{
+ int rc;
+ struct sigevent sevp;
+
+ if (thread_timer_set)
+ rc = 0;
+ else {
+ sevp.sigev_notify = SIGEV_THREAD_ID;
+ sevp.sigev_signo = SIGALRM;
+ sevp.sigev_value.sival_ptr = NULL;
+#if defined(sigev_notify_thread_id)
+ sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
+#else
+ sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
+#endif
+ rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
+ thread_timer_set = !rc;
+ }
+ return 0;
+}
+
+/*
+ * Arms the alarm in timeout seconds for the current thread
+ */
+int afb_thread_timer_arm(int timeout)
+{
+ int rc;
+ struct itimerspec its;
+
+ rc = afb_thread_timer_create();
+ if (rc == 0) {
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = 0;
+ its.it_value.tv_sec = timeout;
+ its.it_value.tv_nsec = 0;
+ rc = timer_settime(thread_timerid, 0, &its, NULL);
+ }
+
+ return rc;
+}
+
+/*
+ * Disarms the current alarm
+ */
+void afb_thread_timer_disarm()
+{
+ if (thread_timer_set)
+ afb_thread_timer_arm(0);
+}
+
+/*
+ * Delstroy any alarm resource for the current thread
+ */
+void afb_thread_timer_delete()
+{
+ if (thread_timer_set) {
+ timer_delete(thread_timerid);
+ thread_timer_set = 0;
+ }
+}
+
+/* add the job to the list */
+static inline void job_add(struct job *job)
+{
+ void *group = job->group;
+ struct job *ijob, **pjob;
+
+ pjob = &first_job;
+ ijob = first_job;
+ group = job->group;
+ if (group == NULL)
+ group = job;
+ while (ijob) {
+ if (ijob->group == group)
+ job->blocked = 1;
+ pjob = &ijob->next;
+ ijob = ijob->next;
+ }
+ *pjob = job;
+ job->next = NULL;
+ remains--;
+}
+
+/* get the next job to process or NULL if none */
+static inline struct job *job_get()
+{
+ struct job *job, **pjob;
+ pjob = &first_job;
+ job = first_job;
+ while (job && job->blocked) {
+ pjob = &job->next;
+ job = job->next;
+ }
+ if (job) {
+ *pjob = job->next;
+ remains++;
+ }
+ return job;
+}
+
+/* unblock a group of job */
+static inline void job_unblock(void *group)
+{
+ struct job *job;
+
+ job = first_job;
+ while (job) {
+ if (job->group == group) {
+ job->blocked = 0;
+ break;
+ }
+ job = job->next;
+ }
+}
+
+/* main loop of processing threads */
+static void *thread_main_loop(void *data)
+{
+ struct thread *me = data;
+ struct job *job, j;
+
+ me->works = 0;
+ me->ended = 0;
+ afb_thread_timer_create();
+ pthread_mutex_lock(&mutex);
+ while (!me->stop) {
+ /* get a job */
+ job = job_get();
+ if (job == NULL && first_job != NULL && running == 0) {
+ /* sad situation!! should not happen */
+ ERROR("threads are blocked!");
+ job = first_job;
+ first_job = job->next;
+ }
+ if (job == NULL) {
+ /* no job... */
+ pthread_cond_wait(&cond, &mutex);
+ } else {
+ /* run the job */
+ running++;
+ me->works = 1;
+ pthread_mutex_unlock(&mutex);
+ j = *job;
+ free(job);
+ afb_thread_timer_arm(j.timeout);
+ afb_sig_req(j.req, j.callback);
+ afb_thread_timer_disarm();
+ afb_req_unref(j.req);
+ pthread_mutex_lock(&mutex);
+ if (j.group != NULL)
+ job_unblock(j.group);
+ me->works = 0;
+ running--;
+ }
+
+ }
+ me->ended = 1;
+ pthread_mutex_unlock(&mutex);
+ afb_thread_timer_delete();
+ return me;
+}
+
+/* start a new thread */
+static int start_one_thread()
+{
+ struct thread *t;
+ int rc;
+
+ assert(started < allowed);
+
+ t = &threads[started++];
+ t->stop = 0;
+ rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+ if (rc != 0) {
+ started--;
+ errno = rc;
+ WARNING("not able to start thread: %m");
+ rc = -1;
+ }
+ return rc;
+}
+
+/* process the 'request' with the 'callback' using a separate thread if available */
+void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
+{
+ const char *info;
+ struct job *job;
+ int rc;
+
+ /* allocates the job */
+ job = malloc(sizeof *job);
+ if (job == NULL) {
+ info = "out of memory";
+ goto error;
+ }
+
+ /* start a thread if needed */
+ pthread_mutex_lock(&mutex);
+ if (remains == 0) {
+ info = "too many jobs";
+ goto error2;
+ }
+ if (started == running && started < allowed) {
+ rc = start_one_thread();
+ if (rc < 0 && started == 0) {
+ /* failed to start threading */
+ info = "can't start thread";
+ goto error2;
+ }
+ }
+
+ /* fills and queues the job */
+ job->callback = callback;
+ job->req = req;
+ job->timeout = timeout;
+ job->group = group;
+ afb_req_addref(req);
+ job_add(job);
+ pthread_mutex_unlock(&mutex);
+
+ /* signal an existing job */
+ pthread_cond_signal(&cond);
+ return;
+
+error2:
+ pthread_mutex_unlock(&mutex);
+ free(job);
+error:
+ ERROR("can't process job with threads: %s", info);
+ afb_req_fail(req, "internal-error", info);
+}
+
+/* initialise the threads */
+int afb_thread_init(int allowed_count, int start_count, int waiter_count)
+{
+ threads = calloc(allowed_count, sizeof *threads);
+ if (threads == NULL) {
+ errno = ENOMEM;
+ ERROR("can't allocate threads");
+ return -1;
+ }
+
+ /* records the allowed count */
+ allowed = allowed_count;
+ started = 0;
+ running = 0;
+ remains = waiter_count;
+
+ /* start at least one thread */
+ pthread_mutex_lock(&mutex);
+ while (started < start_count && start_one_thread() == 0);
+ pthread_mutex_unlock(&mutex);
+
+ /* end */
+ return -(started != start_count);
+}
+
+/* terminate all the threads and all pending requests */
+void afb_thread_terminate()
+{
+ int i, n;
+ struct job *job;
+
+ /* request all threads to stop */
+ pthread_mutex_lock(&mutex);
+ allowed = 0;
+ n = started;
+ for (i = 0 ; i < n ; i++)
+ threads[i].stop = 1;
+
+ /* wait until all thread are terminated */
+ while (started != 0) {
+ /* signal threads */
+ pthread_mutex_unlock(&mutex);
+ pthread_cond_broadcast(&cond);
+ pthread_mutex_lock(&mutex);
+
+ /* join the terminated threads */
+ for (i = 0 ; i < n ; i++) {
+ if (threads[i].tid && threads[i].ended) {
+ pthread_join(threads[i].tid, NULL);
+ threads[i].tid = 0;
+ started--;
+ }
+ }
+ }
+ pthread_mutex_unlock(&mutex);
+ free(threads);
+
+ /* cancel pending jobs */
+ while (first_job) {
+ job = first_job;
+ first_job = job->next;
+ afb_req_fail(job->req, "aborted", "termination of threading");
+ afb_req_unref(job->req);
+ free(job);
+ }
+}
diff --git a/src/afb-thread.h b/src/afb-thread.h
new file mode 100644
index 00000000..e25f90ec
--- /dev/null
+++ b/src/afb-thread.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+struct afb_req;
+
+extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
+
+extern int afb_thread_init(int allowed_count, int start_count, int waiter_count);
+extern void afb_thread_terminate();
+
+extern int afb_thread_timer_create();
+extern int afb_thread_timer_arm(int timeout);
+extern void afb_thread_timer_disarm();
+extern void afb_thread_timer_delete();
+
diff --git a/src/main.c b/src/main.c
index 5871d125..38598189 100644
--- a/src/main.c
+++ b/src/main.c
@@ -39,6 +39,7 @@
#include "afb-context.h"
#include "afb-hreq.h"
#include "afb-sig-handler.h"
+#include "afb-thread.h"
#include "session.h"
#include "verbose.h"
#include "afb-common.h"
@@ -650,12 +651,17 @@ int main(int argc, char *argv[]) {
}
if (afb_sig_handler_init() < 0) {
- ERROR("main fail to initialise signal handlers");
+ ERROR("failed to initialise signal handlers");
return 1;
}
if (afb_common_rootdir_set(config->rootdir) < 0) {
- ERROR("main fail to set common root directory");
+ ERROR("failed to set common root directory");
+ return 1;
+ }
+
+ if (afb_thread_init(3, 1, 20) < 0) {
+ ERROR("failed to initialise threading");
return 1;
}
diff --git a/src/tests/test-thread.c b/src/tests/test-thread.c
new file mode 100644
index 00000000..cc676bd0
--- /dev/null
+++ b/src/tests/test-thread.c
@@ -0,0 +1,99 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <pthread.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#include <afb/afb-req-itf.h>
+#include "../afb-thread.h"
+
+struct foo {
+ int value;
+ int refcount;
+};
+
+void addref(void *closure)
+{
+ struct foo *foo = closure;
+ foo->refcount++;
+}
+
+void unref(void *closure)
+{
+ struct foo *foo = closure;
+ if(!--foo->refcount) {
+ printf("%06d FREE\n", foo->value);
+ free(foo);
+ }
+}
+
+void fail(void *closure, const char *status, const char *info)
+{
+ struct foo *foo = closure;
+ printf("%06d ERROR %s\n", foo->value, status);
+}
+
+struct afb_req_itf itf = {
+ .json = NULL,
+ .get = NULL,
+
+ .success = NULL,
+ .fail = fail,
+
+ .raw = NULL,
+ .send = NULL,
+
+ .context_get = NULL,
+ .context_set = NULL,
+
+ .addref = addref,
+ .unref = unref,
+
+ .session_close = NULL,
+ .session_set_LOA = NULL,
+
+ .subscribe = NULL,
+ .unsubscribe = NULL,
+
+ .subcall = NULL
+};
+
+void process(struct afb_req req)
+{
+ struct timespec ts;
+ struct foo *foo = req.closure;
+ printf("%06d PROCESS T%d\n", foo->value, (int)syscall(SYS_gettid));
+ ts.tv_sec = 0;
+ ts.tv_nsec = foo->value * 1000;
+// nanosleep(&ts, NULL);
+}
+
+int main()
+{
+ int i;
+ struct foo *foo;
+ struct afb_req req;
+ struct timespec ts;
+
+ req.itf = &itf;
+ afb_thread_init(4, 1);
+ for (i = 0 ; i < 10000 ; i++) {
+ req.closure = foo = malloc(sizeof *foo);
+ foo->value = i;
+ foo->refcount = 1;
+ afb_thread_call(req, process, 5, (&ts) + (i % 4));
+ unref(foo);
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1000000;
+// nanosleep(&ts, NULL);
+ }
+ ts.tv_sec = 1;
+ ts.tv_nsec = 0;
+ nanosleep(&ts, NULL);
+ afb_thread_terminate();
+}
+
+
+
+
diff --git a/src/tests/test-thread.sh b/src/tests/test-thread.sh
new file mode 100755
index 00000000..6d1b3a53
--- /dev/null
+++ b/src/tests/test-thread.sh
@@ -0,0 +1,4 @@
+#!/bin/sh
+
+cc test-thread.c ../afb-thread.c ../verbose.c ../afb-sig-handler.c -o test-thread -lrt -lpthread -I../../include
+./test-thread