summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-03-22 16:49:53 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2017-03-22 16:49:53 +0100
commitfeccdb76f572a5fad947475c21b5b9aff696b04b (patch)
tree8e70e3eefef68febe02b5447105fdc6d857426b7
parentf5ce0df45f5ce9f0b57b250dfd44513d085f1e54 (diff)
Refactor of threading and signal monitor
The goal is to allow use of this facilities for things that are not 'afb_req'. Change-Id: I0d99c227934ed45136477bf6235bd1541d5f05cf Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/CMakeLists.txt5
-rw-r--r--src/afb-api-so.c8
-rw-r--r--src/afb-sig-handler.c125
-rw-r--r--src/afb-thread.c367
-rw-r--r--src/afb-thread.h10
-rw-r--r--src/jobs.c344
-rw-r--r--src/jobs.h44
-rw-r--r--src/main.c8
-rw-r--r--src/sig-monitor.c198
-rw-r--r--src/sig-monitor.h (renamed from src/afb-sig-handler.h)15
10 files changed, 626 insertions, 498 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8d0121a5..6cbc6fa3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -69,7 +69,6 @@ ADD_LIBRARY(afb-lib STATIC
afb-method.c
afb-msg-json.c
afb-session.c
- afb-sig-handler.c
afb-svc.c
afb-subcall.c
afb-thread.c
@@ -78,10 +77,12 @@ ADD_LIBRARY(afb-lib STATIC
afb-ws-json1.c
afb-ws.c
afb-wsj1.c
+ jobs.c
locale-root.c
+ sd-fds.c
+ sig-monitor.c
verbose.c
websock.c
- sd-fds.c
)
###########################################
diff --git a/src/afb-api-so.c b/src/afb-api-so.c
index 74f94f36..222fbbbd 100644
--- a/src/afb-api-so.c
+++ b/src/afb-api-so.c
@@ -38,7 +38,6 @@
#include "afb-context.h"
#include "afb-apis.h"
#include "afb-api-so.h"
-#include "afb-sig-handler.h"
#include "afb-thread.h"
#include "afb-evt.h"
#include "afb-svc.h"
@@ -194,12 +193,7 @@ static void call_cb(void *closure, struct afb_req req, struct afb_context *conte
if (!verb->name)
afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix);
else if (call_check(req, context, verb)) {
- if (0)
- /* not threaded */
- afb_sig_req_timeout(req, verb->callback, api_timeout);
- else
- /* threaded */
- afb_thread_call(req, verb->callback, api_timeout, desc);
+ afb_thread_req_call(req, verb->callback, api_timeout, desc);
}
}
diff --git a/src/afb-sig-handler.c b/src/afb-sig-handler.c
deleted file mode 100644
index bbf1531a..00000000
--- a/src/afb-sig-handler.c
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
- * Author "Fulup Ar Foll"
- * Author José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-
-#include <stdlib.h>
-#include <signal.h>
-#include <string.h>
-#include <setjmp.h>
-
-#include <afb/afb-req-itf.h>
-
-#include "afb-sig-handler.h"
-#include "afb-thread.h"
-#include "verbose.h"
-
-static _Thread_local sigjmp_buf *error_handler;
-
-static void on_signal_terminate (int signum)
-{
- ERROR("Terminating signal received %s", strsignal(signum));
- exit(1);
-}
-
-static void on_signal_error(int signum)
-{
- sigset_t sigset;
-
- // unlock signal to allow a new signal to come
- if (error_handler != NULL) {
- sigemptyset(&sigset);
- sigaddset(&sigset, signum);
- sigprocmask(SIG_UNBLOCK, &sigset, 0);
- longjmp(*error_handler, signum);
- }
- if (signum == SIGALRM)
- return;
- ERROR("Unmonitored signal received %s", strsignal(signum));
- exit(2);
-}
-
-static int install(void (*handler)(int), int *signals)
-{
- int result = 1;
- while(*signals > 0) {
- if (signal(*signals, handler) == SIG_ERR) {
- ERROR("failed to install signal handler for signal %s", strsignal(*signals));
- result = 0;
- }
- signals++;
- }
- return result;
-}
-
-int afb_sig_handler_init()
-{
- static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 };
- static int sigterm[] = { SIGINT, SIGABRT, 0 };
-
- return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
-}
-
-int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req))
-{
- volatile int signum;
- sigjmp_buf jmpbuf, *older;
-
- older = error_handler;
- signum = setjmp(jmpbuf);
- if (signum != 0)
- afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
- else {
- error_handler = &jmpbuf;
- callback(req);
- }
- error_handler = older;
- return signum;
-}
-
-int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout)
-{
- int rc;
-
- if (timeout)
- afb_thread_timer_arm(timeout);
- rc = afb_sig_req(req, callback);
- afb_thread_timer_disarm();
- return rc;
-}
-
-void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout)
-{
- volatile int signum;
- sigjmp_buf jmpbuf, *older;
-
- older = error_handler;
- signum = setjmp(jmpbuf);
- if (signum != 0) {
- function(signum, closure);
- }
- else {
- error_handler = &jmpbuf;
- if (timeout)
- afb_thread_timer_arm(timeout);
- function(0, closure);
- }
- afb_thread_timer_disarm();
- error_handler = older;
-}
-
diff --git a/src/afb-thread.c b/src/afb-thread.c
index 790b86b0..67870ce6 100644
--- a/src/afb-thread.c
+++ b/src/afb-thread.c
@@ -17,363 +17,44 @@
#define _GNU_SOURCE
-#include <stdlib.h>
-#include <unistd.h>
-#include <signal.h>
-#include <time.h>
-#include <sys/syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <assert.h>
+#include <string.h>
#include <afb/afb-req-itf.h>
#include "afb-thread.h"
-#include "afb-sig-handler.h"
+#include "jobs.h"
+#include "sig-monitor.h"
#include "verbose.h"
-/* control of threads */
-struct thread
+static void req_call(int signum, void *arg1, void *arg2, void *arg3)
{
- pthread_t tid; /* the thread id */
- unsigned stop: 1; /* stop request */
- unsigned ended: 1; /* ended status */
- unsigned works: 1; /* is it processing a job? */
-};
+ struct afb_req req = { .itf = arg1, .closure = arg2 };
+ void (*callback)(struct afb_req) = arg3;
-/* describes pending job */
-struct job
-{
- void (*callback)(struct afb_req req); /* processing callback */
- struct afb_req req; /* request to be processed */
- int timeout; /* timeout in second for processing the request */
- int blocked; /* is an other request blocking this one ? */
- void *group; /* group of the request */
- struct job *next; /* link to the next job enqueued */
-};
-
-/* synchronisation of threads */
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-
-/* queue of pending jobs */
-static struct job *first_job = NULL;
-
-/* count allowed, started and running threads */
-static int allowed = 0;
-static int started = 0;
-static int running = 0;
-static int remains = 0;
-
-/* list of threads */
-static struct thread *threads = NULL;
-
-/* local timers */
-static _Thread_local int thread_timer_set;
-static _Thread_local timer_t thread_timerid;
-
-/*
- * Creates a timer for the current thread
- *
- * Returns 0 in case of success
- */
-int afb_thread_timer_create()
-{
- int rc;
- struct sigevent sevp;
-
- if (thread_timer_set)
- rc = 0;
- else {
- sevp.sigev_notify = SIGEV_THREAD_ID;
- sevp.sigev_signo = SIGALRM;
- sevp.sigev_value.sival_ptr = NULL;
-#if defined(sigev_notify_thread_id)
- sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
-#else
- sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
-#endif
- rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
- thread_timer_set = !rc;
- }
- return 0;
+ if (signum != 0)
+ afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
+ else
+ callback(req);
+ afb_req_unref(req);
}
-/*
- * Arms the alarm in timeout seconds for the current thread
- */
-int afb_thread_timer_arm(int timeout)
+void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
{
int rc;
- struct itimerspec its;
-
- rc = afb_thread_timer_create();
- if (rc == 0) {
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = timeout;
- its.it_value.tv_nsec = 0;
- rc = timer_settime(thread_timerid, 0, &its, NULL);
- }
- return rc;
-}
-
-/*
- * Disarms the current alarm
- */
-void afb_thread_timer_disarm()
-{
- if (thread_timer_set)
- afb_thread_timer_arm(0);
-}
-
-/*
- * Delstroy any alarm resource for the current thread
- */
-void afb_thread_timer_delete()
-{
- if (thread_timer_set) {
- timer_delete(thread_timerid);
- thread_timer_set = 0;
- }
-}
-
-/* add the job to the list */
-static inline void job_add(struct job *job)
-{
- void *group = job->group;
- struct job *ijob, **pjob;
-
- pjob = &first_job;
- ijob = first_job;
- group = job->group;
- if (group == NULL)
- group = job;
- while (ijob) {
- if (ijob->group == group)
- job->blocked = 1;
- pjob = &ijob->next;
- ijob = ijob->next;
- }
- *pjob = job;
- job->next = NULL;
- remains--;
-}
-
-/* get the next job to process or NULL if none */
-static inline struct job *job_get()
-{
- struct job *job, **pjob;
- pjob = &first_job;
- job = first_job;
- while (job && job->blocked) {
- pjob = &job->next;
- job = job->next;
- }
- if (job) {
- *pjob = job->next;
- remains++;
- }
- return job;
-}
-
-/* unblock a group of job */
-static inline void job_unblock(void *group)
-{
- struct job *job;
-
- job = first_job;
- while (job) {
- if (job->group == group) {
- job->blocked = 0;
- break;
- }
- job = job->next;
- }
-}
-
-/* main loop of processing threads */
-static void *thread_main_loop(void *data)
-{
- struct thread *me = data;
- struct job *job, j;
-
- me->works = 0;
- me->ended = 0;
- afb_thread_timer_create();
- pthread_mutex_lock(&mutex);
- while (!me->stop) {
- /* get a job */
- job = job_get();
- if (job == NULL && first_job != NULL && running == 0) {
- /* sad situation!! should not happen */
- ERROR("threads are blocked!");
- job = first_job;
- first_job = job->next;
- }
- if (job == NULL) {
- /* no job... */
- pthread_cond_wait(&cond, &mutex);
- } else {
- /* run the job */
- running++;
- me->works = 1;
- pthread_mutex_unlock(&mutex);
- j = *job;
- free(job);
- afb_thread_timer_arm(j.timeout);
- afb_sig_req(j.req, j.callback);
- afb_thread_timer_disarm();
- afb_req_unref(j.req);
- pthread_mutex_lock(&mutex);
- if (j.group != NULL)
- job_unblock(j.group);
- me->works = 0;
- running--;
- }
-
- }
- me->ended = 1;
- pthread_mutex_unlock(&mutex);
- afb_thread_timer_delete();
- return me;
-}
-
-/* start a new thread */
-static int start_one_thread()
-{
- struct thread *t;
- int rc;
-
- assert(started < allowed);
-
- t = &threads[started++];
- t->stop = 0;
- rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
- if (rc != 0) {
- started--;
- errno = rc;
- WARNING("not able to start thread: %m");
- rc = -1;
- }
- return rc;
-}
-
-/* process the 'request' with the 'callback' using a separate thread if available */
-void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
-{
- const char *info;
- struct job *job;
- int rc;
-
- /* allocates the job */
- job = malloc(sizeof *job);
- if (job == NULL) {
- info = "out of memory";
- goto error;
- }
-
- /* start a thread if needed */
- pthread_mutex_lock(&mutex);
- if (remains == 0) {
- info = "too many jobs";
- goto error2;
- }
- if (started == running && started < allowed) {
- rc = start_one_thread();
- if (rc < 0 && started == 0) {
- /* failed to start threading */
- info = "can't start thread";
- goto error2;
- }
- }
-
- /* fills and queues the job */
- job->callback = callback;
- job->req = req;
- job->timeout = timeout;
- job->blocked = 0;
- job->group = group;
afb_req_addref(req);
- job_add(job);
- pthread_mutex_unlock(&mutex);
-
- /* signal an existing job */
- pthread_cond_signal(&cond);
- return;
-
-error2:
- pthread_mutex_unlock(&mutex);
- free(job);
-error:
- ERROR("can't process job with threads: %s", info);
- afb_req_fail(req, "internal-error", info);
-}
-
-/* initialise the threads */
-int afb_thread_init(int allowed_count, int start_count, int waiter_count)
-{
- threads = calloc(allowed_count, sizeof *threads);
- if (threads == NULL) {
- errno = ENOMEM;
- ERROR("can't allocate threads");
- return -1;
- }
-
- /* records the allowed count */
- allowed = allowed_count;
- started = 0;
- running = 0;
- remains = waiter_count;
-
- /* start at least one thread */
- pthread_mutex_lock(&mutex);
- while (started < start_count && start_one_thread() == 0);
- pthread_mutex_unlock(&mutex);
-
- /* end */
- return -(started != start_count);
-}
-
-/* terminate all the threads and all pending requests */
-void afb_thread_terminate()
-{
- int i, n;
- struct job *job;
-
- /* request all threads to stop */
- pthread_mutex_lock(&mutex);
- allowed = 0;
- n = started;
- for (i = 0 ; i < n ; i++)
- threads[i].stop = 1;
-
- /* wait until all thread are terminated */
- while (started != 0) {
- /* signal threads */
- pthread_mutex_unlock(&mutex);
- pthread_cond_broadcast(&cond);
- pthread_mutex_lock(&mutex);
-
- /* join the terminated threads */
- for (i = 0 ; i < n ; i++) {
- if (threads[i].tid && threads[i].ended) {
- pthread_join(threads[i].tid, NULL);
- threads[i].tid = 0;
- started--;
- }
+ if (0) {
+ /* no threading */
+ sig_monitor3(timeout, req_call, (void*)req.itf, req.closure, callback);
+ } else {
+ /* threading */
+ rc = jobs_queue3(group, timeout, req_call, (void*)req.itf, req.closure, callback);
+ if (rc < 0) {
+ /* TODO: allows or not to proccess it directly as when no threading? (see above) */
+ ERROR("can't process job with threads: %m");
+ afb_req_fail_f(req, "cancelled", "not able to pipe a job for the task");
+ afb_req_unref(req);
}
}
- pthread_mutex_unlock(&mutex);
- free(threads);
-
- /* cancel pending jobs */
- while (first_job) {
- job = first_job;
- first_job = job->next;
- afb_req_fail(job->req, "aborted", "termination of threading");
- afb_req_unref(job->req);
- free(job);
- }
}
+
diff --git a/src/afb-thread.h b/src/afb-thread.h
index 0559dfc0..4e44b55e 100644
--- a/src/afb-thread.h
+++ b/src/afb-thread.h
@@ -19,13 +19,5 @@
struct afb_req;
-extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
-
-extern int afb_thread_init(int allowed_count, int start_count, int waiter_count);
-extern void afb_thread_terminate();
-
-extern int afb_thread_timer_create();
-extern int afb_thread_timer_arm(int timeout);
-extern void afb_thread_timer_disarm();
-extern void afb_thread_timer_delete();
+extern void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
diff --git a/src/jobs.c b/src/jobs.c
new file mode 100644
index 00000000..c4f32244
--- /dev/null
+++ b/src/jobs.c
@@ -0,0 +1,344 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "jobs.h"
+#include "sig-monitor.h"
+#include "verbose.h"
+
+/* control of threads */
+struct thread
+{
+ pthread_t tid; /* the thread id */
+ unsigned stop: 1; /* stop request */
+ unsigned ended: 1; /* ended status */
+ unsigned works: 1; /* is it processing a job? */
+};
+
+/* describes pending job */
+struct job
+{
+ struct job *next; /* link to the next job enqueued */
+ void *group; /* group of the request */
+ void (*callback)(int,void*,void*,void*); /* processing callback */
+ void *arg1; /* first arg */
+ void *arg2; /* second arg */
+ void *arg3; /* second arg */
+ int timeout; /* timeout in second for processing the request */
+ int blocked; /* is an other request blocking this one ? */
+};
+
+/* synchronisation of threads */
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
+/* queue of pending jobs */
+static struct job *first_job = NULL;
+
+/* count allowed, started and running threads */
+static int allowed = 0;
+static int started = 0;
+static int running = 0;
+static int remains = 0;
+
+/* list of threads */
+static struct thread *threads = NULL;
+
+/* add the job to the list */
+static inline void job_add(struct job *job)
+{
+ void *group = job->group;
+ struct job *ijob, **pjob;
+
+ pjob = &first_job;
+ ijob = first_job;
+ group = job->group ? : job;
+ while (ijob) {
+ if (ijob->group == group)
+ job->blocked = 1;
+ pjob = &ijob->next;
+ ijob = ijob->next;
+ }
+ *pjob = job;
+ job->next = NULL;
+ remains--;
+}
+
+/* get the next job to process or NULL if none */
+static inline struct job *job_get()
+{
+ struct job *job, **pjob;
+ pjob = &first_job;
+ job = first_job;
+ while (job && job->blocked) {
+ pjob = &job->next;
+ job = job->next;
+ }
+ if (job) {
+ *pjob = job->next;
+ remains++;
+ }
+ return job;
+}
+
+/* unblock a group of job */
+static inline void job_unblock(void *group)
+{
+ struct job *job;
+
+ job = first_job;
+ while (job) {
+ if (job->group == group) {
+ job->blocked = 0;
+ break;
+ }
+ job = job->next;
+ }
+}
+
+/* call the job */
+static inline void job_call(int signum, void *arg)
+{
+ struct job *job = arg;
+ job->callback(signum, job->arg1, job->arg2, job->arg3);
+}
+
+/* cancel the job */
+static inline void job_cancel(int signum, void *arg)
+{
+ struct job *job = arg;
+ job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
+}
+
+/* main loop of processing threads */
+static void *thread_main_loop(void *data)
+{
+ struct thread *me = data;
+ struct job *job;
+
+ me->works = 0;
+ me->ended = 0;
+ sig_monitor_init_timeouts();
+ pthread_mutex_lock(&mutex);
+ while (!me->stop) {
+ /* get a job */
+ job = job_get();
+ if (job == NULL && first_job != NULL && running == 0) {
+ /* sad situation!! should not happen */
+ ERROR("threads are blocked!");
+ job = first_job;
+ first_job = job->next;
+ }
+ if (job == NULL) {
+ /* no job... */
+ pthread_cond_wait(&cond, &mutex);
+ } else {
+ /* run the job */
+ running++;
+ me->works = 1;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(job->timeout, job_call, job);
+ pthread_mutex_lock(&mutex);
+ me->works = 0;
+ running--;
+ if (job->group != NULL)
+ job_unblock(job->group);
+ free(job);
+ }
+
+ }
+ me->ended = 1;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor_clean_timeouts();
+ return me;
+}
+
+/* start a new thread */
+static int start_one_thread()
+{
+ struct thread *t;
+ int rc;
+
+ assert(started < allowed);
+
+ t = &threads[started++];
+ t->stop = 0;
+ rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+ if (rc != 0) {
+ started--;
+ errno = rc;
+ WARNING("not able to start thread: %m");
+ rc = -1;
+ }
+ return rc;
+}
+
+int jobs_queue(
+ void *group,
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
+}
+
+int jobs_queue2(
+ void *group,
+ int timeout,
+ void (*callback)(int, void*, void*),
+ void *arg1,
+ void *arg2)
+{
+ return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
+}
+
+/* queue the job to the 'callback' using a separate thread if available */
+int jobs_queue3(
+ void *group,
+ int timeout,
+ void (*callback)(int, void*, void *, void*),
+ void *arg1,
+ void *arg2,
+ void *arg3)
+{
+ const char *info;
+ struct job *job;
+ int rc;
+
+ /* allocates the job */
+ job = malloc(sizeof *job);
+ if (job == NULL) {
+ errno = ENOMEM;
+ info = "out of memory";
+ goto error;
+ }
+
+ /* start a thread if needed */
+ pthread_mutex_lock(&mutex);
+ if (remains == 0) {
+ errno = EBUSY;
+ info = "too many jobs";
+ goto error2;
+ }
+ if (started == running && started < allowed) {
+ rc = start_one_thread();
+ if (rc < 0 && started == 0) {
+ /* failed to start threading */
+ info = "can't start first thread";
+ goto error2;
+ }
+ }
+
+ /* fills and queues the job */
+ job->group = group;
+ job->timeout = timeout;
+ job->callback = callback;
+ job->arg1 = arg1;
+ job->arg2 = arg2;
+ job->arg3 = arg3;
+ job->blocked = 0;
+ job_add(job);
+ pthread_mutex_unlock(&mutex);
+
+ /* signal an existing job */
+ pthread_cond_signal(&cond);
+ return 0;
+
+error2:
+ pthread_mutex_unlock(&mutex);
+ free(job);
+error:
+ ERROR("can't process job with threads: %s, %m", info);
+ return -1;
+}
+
+/* initialise the threads */
+int jobs_init(int allowed_count, int start_count, int waiter_count)
+{
+ threads = calloc(allowed_count, sizeof *threads);
+ if (threads == NULL) {
+ errno = ENOMEM;
+ ERROR("can't allocate threads");
+ return -1;
+ }
+
+ /* records the allowed count */
+ allowed = allowed_count;
+ started = 0;
+ running = 0;
+ remains = waiter_count;
+
+ /* start at least one thread */
+ pthread_mutex_lock(&mutex);
+ while (started < start_count && start_one_thread() == 0);
+ pthread_mutex_unlock(&mutex);
+
+ /* end */
+ return -(started != start_count);
+}
+
+/* terminate all the threads and all pending requests */
+void jobs_terminate()
+{
+ int i, n;
+ struct job *job;
+
+ /* request all threads to stop */
+ pthread_mutex_lock(&mutex);
+ allowed = 0;
+ n = started;
+ for (i = 0 ; i < n ; i++)
+ threads[i].stop = 1;
+
+ /* wait until all thread are terminated */
+ while (started != 0) {
+ /* signal threads */
+ pthread_mutex_unlock(&mutex);
+ pthread_cond_broadcast(&cond);
+ pthread_mutex_lock(&mutex);
+
+ /* join the terminated threads */
+ for (i = 0 ; i < n ; i++) {
+ if (threads[i].tid && threads[i].ended) {
+ pthread_join(threads[i].tid, NULL);
+ threads[i].tid = 0;
+ started--;
+ }
+ }
+ }
+ pthread_mutex_unlock(&mutex);
+ free(threads);
+
+ /* cancel pending jobs */
+ while (first_job) {
+ job = first_job;
+ first_job = job->next;
+ sig_monitor(0, job_cancel, job);
+ free(job);
+ }
+}
+
diff --git a/src/jobs.h b/src/jobs.h
new file mode 100644
index 00000000..ef72e0c1
--- /dev/null
+++ b/src/jobs.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern int jobs_queue(
+ void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg),
+ void *arg);
+
+extern int jobs_queue2(
+ void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg1, void *arg2),
+ void *arg1,
+ void *arg2);
+
+extern int jobs_queue3(
+ void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg1, void *arg2, void *arg3),
+ void *arg1,
+ void *arg2,
+ void *arg3);
+
+extern int jobs_init(int allowed_count, int start_count, int waiter_count);
+extern void jobs_terminate();
+
+
diff --git a/src/main.c b/src/main.c
index 05805e2d..40ad19c8 100644
--- a/src/main.c
+++ b/src/main.c
@@ -41,8 +41,8 @@
#include "afb-hsrv.h"
#include "afb-context.h"
#include "afb-hreq.h"
-#include "afb-sig-handler.h"
-#include "afb-thread.h"
+#include "sig-monitor.h"
+#include "jobs.h"
#include "afb-session.h"
#include "verbose.h"
#include "afb-common.h"
@@ -439,7 +439,7 @@ int main(int argc, char *argv[])
exit(1);
}
- if (afb_sig_handler_init() < 0) {
+ if (sig_monitor_init() < 0) {
ERROR("failed to initialise signal handlers");
return 1;
}
@@ -450,7 +450,7 @@ int main(int argc, char *argv[])
return 1;
}
- if (afb_thread_init(3, 1, 20) < 0) {
+ if (jobs_init(3, 1, 20) < 0) {
ERROR("failed to initialise threading");
return 1;
}
diff --git a/src/sig-monitor.c b/src/sig-monitor.c
new file mode 100644
index 00000000..d00f0f97
--- /dev/null
+++ b/src/sig-monitor.c
@@ -0,0 +1,198 @@
+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <signal.h>
+#include <string.h>
+#include <setjmp.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#include "sig-monitor.h"
+#include "verbose.h"
+
+/* local handler */
+static _Thread_local sigjmp_buf *error_handler;
+
+/* local timers */
+static _Thread_local int thread_timer_set;
+static _Thread_local timer_t thread_timerid;
+
+/*
+ * Creates a timer for the current thread
+ *
+ * Returns 0 in case of success
+ */
+static inline int timeout_create()
+{
+ int rc;
+ struct sigevent sevp;
+
+ if (thread_timer_set)
+ rc = 0;
+ else {
+ sevp.sigev_notify = SIGEV_THREAD_ID;
+ sevp.sigev_signo = SIGALRM;
+ sevp.sigev_value.sival_ptr = NULL;
+#if defined(sigev_notify_thread_id)
+ sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
+#else
+ sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
+#endif
+ rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
+ thread_timer_set = !rc;
+ }
+ return 0;
+}
+
+/*
+ * Arms the alarm in timeout seconds for the current thread
+ */
+static inline int timeout_arm(int timeout)
+{
+ int rc;
+ struct itimerspec its;
+
+ rc = timeout_create();
+ if (rc == 0) {
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = 0;
+ its.it_value.tv_sec = timeout;
+ its.it_value.tv_nsec = 0;
+ rc = timer_settime(thread_timerid, 0, &its, NULL);
+ }
+
+ return rc;
+}
+
+/*
+ * Disarms the current alarm
+ */
+static inline void timeout_disarm()
+{
+ if (thread_timer_set)
+ timeout_arm(0);
+}
+
+/*
+ * Destroy any alarm resource for the current thread
+ */
+static inline void timeout_delete()
+{
+ if (thread_timer_set) {
+ timer_delete(thread_timerid);
+ thread_timer_set = 0;
+ }
+}
+
+
+/* Handles signals that terminate the process */
+static void on_signal_terminate (int signum)
+{
+ ERROR("Terminating signal %d received: %s", signum, strsignal(signum));
+ exit(1);
+}
+
+/* Handles monitored signals that can be continued */
+static void on_signal_error(int signum)
+{
+ sigset_t sigset;
+
+ // unlock signal to allow a new signal to come
+ if (error_handler != NULL) {
+ sigemptyset(&sigset);
+ sigaddset(&sigset, signum);
+ sigprocmask(SIG_UNBLOCK, &sigset, 0);
+ longjmp(*error_handler, signum);
+ }
+ if (signum == SIGALRM)
+ return;
+ ERROR("Unmonitored signal %d received: %s", signum, strsignal(signum));
+ exit(2);
+}
+
+/* install the handlers */
+static int install(void (*handler)(int), int *signals)
+{
+ int result = 1;
+ while(*signals > 0) {
+ if (signal(*signals, handler) == SIG_ERR) {
+ ERROR("failed to install signal handler for signal %s", strsignal(*signals));
+ result = 0;
+ }
+ signals++;
+ }
+ return result;
+}
+
+int sig_monitor_init()
+{
+ static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 };
+ static int sigterm[] = { SIGINT, SIGABRT, 0 };
+
+ return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
+}
+
+int sig_monitor_init_timeouts()
+{
+ return timeout_create();
+}
+
+void sig_monitor_clean_timeouts()
+{
+ timeout_delete();
+}
+
+void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg)
+{
+ sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg, NULL, NULL);
+}
+
+void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2)
+{
+ sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg1, arg2, NULL);
+}
+
+void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3)
+{
+ volatile int signum, signum2;
+ sigjmp_buf jmpbuf, *older;
+
+ older = error_handler;
+ signum = setjmp(jmpbuf);
+ if (signum == 0) {
+ error_handler = &jmpbuf;
+ if (timeout)
+ timeout_arm(timeout);
+ function(0, arg1, arg2, arg3);
+ } else {
+ signum2 = setjmp(jmpbuf);
+ if (signum2 == 0)
+ function(signum, arg1, arg2, arg3);
+ }
+ error_handler = older;
+ if (timeout)
+ timeout_disarm();
+}
+
+
+
+
+
diff --git a/src/afb-sig-handler.h b/src/sig-monitor.h
index 4f41324d..a3a28bc1 100644
--- a/src/afb-sig-handler.h
+++ b/src/sig-monitor.h
@@ -1,6 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
- * Author "Fulup Ar Foll"
+ * Copyright (C) 2017 "IoT.bzh"
* Author José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,11 +17,11 @@
#pragma once
-struct afb_req;
+extern int sig_monitor_init();
+extern void sig_monitor_clean_timeouts();
+extern int sig_monitor_init_timeouts();
-extern int afb_sig_handler_init();
-
-extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout);
-extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req));
-extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout);
+extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg);
+extern void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2);
+extern void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3);