aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-03-28 17:46:32 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2017-03-28 17:46:32 +0200
commit162436f4ffbbf63d867735f7de5b78dcd684f890 (patch)
tree3c7a1f4b37829da57b1b9be1ecf027d753101a89
parent48827b7b9862ab5961f938f38a8667e15421a50c (diff)
Refactor job to allow synchronous calls
The family of methods "jobs_invoke" make synchronous job activation. It waits for the completion of the job while still dispatching it internally and providing the calling thread for processing any job queued. Change-Id: Id36a30789cc51245a7bbfca42f0122cf4ea623b2 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c436
-rw-r--r--src/jobs.h29
-rw-r--r--src/tests/test-thread.c22
-rwxr-xr-xsrc/tests/test-thread.sh2
4 files changed, 349 insertions, 140 deletions
diff --git a/src/jobs.c b/src/jobs.c
index 2910f0a3..f7acebf4 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -30,14 +30,6 @@
#include "sig-monitor.h"
#include "verbose.h"
-/** control of threads */
-struct thread
-{
- struct thread *next; /**< next thread of the list */
- pthread_t tid; /**< the thread id */
- unsigned stop: 1; /**< stop request */
-};
-
/* describes pending job */
struct job
{
@@ -51,6 +43,17 @@ struct job
int blocked; /* is an other request blocking this one ? */
};
+/** control of threads */
+struct thread
+{
+ struct thread *next; /**< next thread of the list */
+ struct thread *upper; /**< upper same thread */
+ struct job *job; /**< currently processed job */
+ pthread_t tid; /**< the thread id */
+ unsigned stop: 1; /**< stop requested */
+ unsigned lowered: 1; /**< has a lower same thread */
+};
+
/* synchronisation of threads */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
@@ -63,6 +66,7 @@ static int remains = 0; /** remaining count of jobs that can be created */
/* list of threads */
static struct thread *threads;
+static _Thread_local struct thread *current;
/* queue of pending jobs */
static struct job *first_job;
@@ -70,70 +74,15 @@ static struct job *first_evloop;
static struct job *free_jobs;
/**
- * Adds the 'job' at the end of the list of jobs, marking it
- * as blocked if an other job with the same group is pending.
- * @param job the job to add
- */
-static inline void job_add(struct job *job)
-{
- void *group;
- struct job *ijob, **pjob;
-
- pjob = &first_job;
- ijob = first_job;
- group = job->group ? : (void*)(intptr_t)1;
- while (ijob) {
- if (ijob->group == group)
- job->blocked = 1;
- pjob = &ijob->next;
- ijob = ijob->next;
- }
- job->next = NULL;
- *pjob = job;
- remains--;
-}
-
-/**
- * Get the next job to process or NULL if none.
- * The returned job if any is removed from the list of
- * jobs.
- * @return the job to process
- */
-static inline struct job *job_get()
-{
- struct job *job, **pjob;
-
- pjob = &first_job;
- job = first_job;
- while (job && job->blocked) {
- pjob = &job->next;
- job = job->next;
- }
- if (job) {
- *pjob = job->next;
- remains++;
- }
- return job;
-}
-
-/**
- * Unblock the first pending job of a group (if any)
- * @param group the group to unblock
+ * Create a new job with the given parameters
+ * @param group the group of the job
+ * @param timeout the timeout of the job (0 if none)
+ * @param callback the function that achieves the job
+ * @param arg1 the first argument of the callback
+ * @param arg2 the second argument of the callback
+ * @param arg3 the third argument of the callback
+ * @return the created job unblock or NULL when no more memory
*/
-static inline void job_unblock(void *group)
-{
- struct job *job;
-
- job = first_job;
- while (job) {
- if (job->group == group) {
- job->blocked = 0;
- break;
- }
- job = job->next;
- }
-}
-
static struct job *job_create(
void *group,
int timeout,
@@ -144,11 +93,12 @@ static struct job *job_create(
{
struct job *job;
- /* allocates the job */
+ /* try recyle existing job */
job = free_jobs;
if (job)
free_jobs = job->next;
else {
+ /* allocation without blocking */
pthread_mutex_unlock(&mutex);
job = malloc(sizeof *job);
pthread_mutex_lock(&mutex);
@@ -157,6 +107,7 @@ static struct job *job_create(
goto end;
}
}
+ /* initialises the job */
job->group = group;
job->timeout = timeout;
job->callback = callback;
@@ -168,17 +119,95 @@ end:
return job;
}
-static inline void job_destroy(struct job *job)
+/**
+ * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
+ * as blocked if an other job with the same group is pending.
+ * @param job1 the first job to add
+ * @param job2 the second job to add or NULL
+ */
+static void job_add2(struct job *job1, struct job *job2)
{
- job->next = free_jobs;
- free_jobs = job;
+ void *group1, *group2, *group;
+ struct job *ijob, **pjob;
+
+ /* prepare to add */
+ group1 = job1->group;
+ job1->next = job2;
+ if (!job2)
+ group2 = NULL;
+ else {
+ job2->next = NULL;
+ group2 = job2->group;
+ if (group2 && group2 == group1)
+ job2->blocked = 1;
+ }
+
+ /* search end and blackers */
+ pjob = &first_job;
+ ijob = first_job;
+ while (ijob) {
+ group = ijob->group;
+ if (group) {
+ if (group == group1)
+ job1->blocked = 1;
+ if (group == group2)
+ job2->blocked = 1;
+ }
+ pjob = &ijob->next;
+ ijob = ijob->next;
+ }
+
+ /* queue the jobs */
+ *pjob = job1;
}
+/**
+ * Get the next job to process or NULL if none.
+ * The returned job if any isn't removed from
+ * the list of jobs.
+ * @return the job to process
+ */
+static inline struct job *job_get()
+{
+ struct job *job;
+
+ job = first_job;
+ while (job && job->blocked)
+ job = job->next;
+ return job;
+}
+
+/**
+ * Releases the processed 'job'
+ * @param job the job to release
+ */
static inline void job_release(struct job *job)
{
- if (job->group)
- job_unblock(job->group);
- job_destroy(job);
+ struct job *ijob, **pjob;
+ void *group;
+
+ /* first unqueue the job */
+ pjob = &first_job;
+ ijob = first_job;
+ while (ijob != job) {
+ pjob = &ijob->next;
+ ijob = ijob->next;
+ }
+ *pjob = job->next;
+
+ /* then unblock jobs of the same group */
+ group = job->group;
+ if (group) {
+ ijob = job->next;
+ while (ijob && ijob->group != group)
+ ijob = ijob->next;
+ if (ijob)
+ ijob->blocked = 0;
+ }
+
+ /* recycle the job */
+ job->next = free_jobs;
+ free_jobs = job;
}
/** monitored call to the job */
@@ -195,24 +224,27 @@ static void job_cancel(int signum, void *arg)
}
/* main loop of processing threads */
-static void *thread_main_loop(void *data)
+static void thread_run(struct thread *me)
{
- struct thread me, **prv;
+ struct thread **prv;
struct job *job;
/* init */
- me.tid = pthread_self();
- me.stop = 0;
- sig_monitor_init_timeouts();
-
- /* chain in */
- pthread_mutex_lock(&mutex);
- me.next = threads;
- threads = &me;
+ me->tid = pthread_self();
+ me->stop = 0;
+ me->lowered = 0;
+ me->upper = current;
+ if (current)
+ current->lowered = 1;
+ else
+ sig_monitor_init_timeouts();
+ current = me;
+ me->next = threads;
+ threads = me;
/* loop until stopped */
running++;
- while (!me.stop) {
+ while (!me->stop) {
/* get a job */
job = job_get();
if (!job && first_job && running == 0) {
@@ -223,6 +255,9 @@ static void *thread_main_loop(void *data)
}
if (job) {
/* run the job */
+ remains++;
+ job->blocked = 1;
+ me->job = job;
pthread_mutex_unlock(&mutex);
sig_monitor(job->timeout, job_call, job);
pthread_mutex_lock(&mutex);
@@ -248,15 +283,27 @@ static void *thread_main_loop(void *data)
}
running--;
- /* chain out */
+ /* uninit */
prv = &threads;
- while (*prv != &me)
+ while (*prv != me)
prv = &(*prv)->next;
- *prv = me.next;
+ *prv = me->next;
+ current = me->upper;
+ if (current)
+ current->lowered = 0;
+ else
+ sig_monitor_clean_timeouts();
pthread_mutex_unlock(&mutex);
+}
- /* uninit and terminate */
- sig_monitor_clean_timeouts();
+/* main loop of processing threads */
+static void *thread_create(void *data)
+{
+ struct thread me;
+
+ pthread_mutex_lock(&mutex);
+ thread_run(&me);
+ pthread_mutex_unlock(&mutex);
return NULL;
}
@@ -269,7 +316,7 @@ static int start_one_thread()
assert(started < allowed);
started++;
- rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
+ rc = pthread_create(&tid, NULL, thread_create, NULL);
if (rc != 0) {
started--;
errno = rc;
@@ -279,6 +326,27 @@ static int start_one_thread()
return rc;
}
+static int start_one_thread_if_needed()
+{
+ int rc;
+
+ if (started == running && started < allowed) {
+ /* all threads are busy and a new can be started */
+ rc = start_one_thread();
+ if (rc < 0 && started == 0)
+ return rc; /* no thread available */
+ }
+ return 0;
+}
+
+int jobs_queue0(
+ void *group,
+ int timeout,
+ void (*callback)(int signum))
+{
+ return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
+}
+
int jobs_queue(
void *group,
int timeout,
@@ -321,23 +389,24 @@ int jobs_queue3(
goto error;
}
- /* start a thread if needed */
+ /* check availability */
if (remains == 0) {
errno = EBUSY;
info = "too many jobs";
goto error2;
}
- if (started == running && started < allowed) {
- rc = start_one_thread();
- if (rc < 0 && started == 0) {
- /* failed to start threading */
- info = "can't start first thread";
- goto error2;
- }
+
+ /* start a thread if needed */
+ rc = start_one_thread_if_needed();
+ if (rc < 0) {
+ /* failed to start threading */
+ info = "can't start first thread";
+ goto error2;
}
/* queues the job */
- job_add(job);
+ remains--;
+ job_add2(job, NULL);
pthread_mutex_unlock(&mutex);
/* signal an existing job */
@@ -345,7 +414,8 @@ int jobs_queue3(
return 0;
error2:
- job_destroy(job);
+ job->next = free_jobs;
+ free_jobs = job;
error:
ERROR("can't process job with threads: %s, %m", info);
pthread_mutex_unlock(&mutex);
@@ -370,10 +440,96 @@ int jobs_init(int allowed_count, int start_count, int waiter_count)
return -(started != start_count);
}
+int jobs_invoke0(
+ int timeout,
+ void (*callback)(int signum))
+{
+ return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
+}
+
+int jobs_invoke(
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
+}
+
+int jobs_invoke2(
+ int timeout,
+ void (*callback)(int, void*, void*),
+ void *arg1,
+ void *arg2)
+{
+ return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
+}
+
+static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
+{
+ struct thread *t = arg1;
+ pthread_mutex_lock(&mutex);
+ t->stop = 1;
+ pthread_mutex_unlock(&mutex);
+}
+
+/* invoke the job to the 'callback' using a separate thread if available */
+int jobs_invoke3(
+ int timeout,
+ void (*callback)(int, void*, void *, void*),
+ void *arg1,
+ void *arg2,
+ void *arg3)
+{
+ const char *info;
+ struct job *job1, *job2;
+ int rc;
+ struct thread me;
+
+ pthread_mutex_lock(&mutex);
+
+ /* allocates the job */
+ job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
+ job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
+ if (!job1 || !job2) {
+ errno = ENOMEM;
+ info = "out of memory";
+ goto error;
+ }
+
+ /* start a thread if needed */
+ rc = start_one_thread_if_needed();
+ if (rc < 0) {
+ /* failed to start threading */
+ info = "can't start first thread";
+ goto error;
+ }
+
+ /* queues the job */
+ job_add2(job1, job2);
+
+ /* run untill stopped */
+ thread_run(&me);
+ pthread_mutex_unlock(&mutex);
+ return 0;
+
+error:
+ if (job1) {
+ job1->next = free_jobs;
+ free_jobs = job1;
+ }
+ if (job2) {
+ job2->next = free_jobs;
+ free_jobs = job2;
+ }
+ ERROR("can't process job with threads: %s, %m", info);
+ pthread_mutex_unlock(&mutex);
+ return -1;
+}
+
/* terminate all the threads and all pending requests */
-void jobs_terminate(int wait)
+void jobs_terminate()
{
- struct job *job;
+ struct job *job, *head, *tail;
pthread_t me, other;
struct thread *t;
@@ -399,13 +555,36 @@ void jobs_terminate(int wait)
pthread_mutex_lock(&mutex);
}
- /* cancel pending jobs */
- while (first_job) {
- job = first_job;
- first_job = job->next;
- sig_monitor(0, job_cancel, job);
- free(job);
+ /* cancel pending jobs of other threads */
+ head = first_job;
+ first_job = NULL;
+ tail = NULL;
+ while (head) {
+ /* unlink the job */
+ job = head;
+ head = job->next;
+
+ /* search if job is stacked for current */
+ t = current;
+ while (t && t->job != job)
+ t = t->upper;
+ if (t) {
+ /* yes, relink it at end */
+ if (tail)
+ tail->next = job;
+ else
+ first_job = job;
+ tail = job;
+ job->next = NULL;
+ } else {
+ /* no cancel the job */
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(0, job_cancel, job);
+ free(job);
+ pthread_mutex_lock(&mutex);
+ }
}
+ pthread_mutex_unlock(&mutex);
}
int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
@@ -428,34 +607,19 @@ int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*
int jobs_add_me()
{
- pthread_t me;
- struct thread *t;
+ struct thread me;
- /* how am i? */
- me = pthread_self();
-
- /* request all threads to stop */
- pthread_mutex_lock(&mutex);
- t = threads;
- while (t) {
- if (pthread_equal(t->tid, me)) {
- pthread_mutex_unlock(&mutex);
- ERROR("thread already running");
- errno = EINVAL;
- return -1;
- }
- t = t->next;
+ /* check whether already running */
+ if (current) {
+ ERROR("thread already running");
+ errno = EINVAL;
+ return -1;
}
/* allowed... */
- allowed++;
- pthread_mutex_unlock(&mutex);
-
- /* run */
- thread_main_loop(NULL);
-
- /* returns */
pthread_mutex_lock(&mutex);
+ allowed++;
+ thread_run(&me);
allowed--;
pthread_mutex_unlock(&mutex);
return 0;
diff --git a/src/jobs.h b/src/jobs.h
index 95565b86..cf433892 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -17,6 +17,11 @@
#pragma once
+extern int jobs_queue0(
+ void *group,
+ int timeout,
+ void (*callback)(int signum));
+
extern int jobs_queue(
void *group,
int timeout,
@@ -38,10 +43,32 @@ extern int jobs_queue3(
void *arg2,
void *arg3);
+extern int jobs_invoke0(
+ int timeout,
+ void (*callback)(int signum));
+
+extern int jobs_invoke(
+ int timeout,
+ void (*callback)(int signum, void* arg),
+ void *arg);
+
+extern int jobs_invoke2(
+ int timeout,
+ void (*callback)(int signum, void* arg1, void *arg2),
+ void *arg1,
+ void *arg2);
+
+extern int jobs_invoke3(
+ int timeout,
+ void (*callback)(int signum, void* arg1, void *arg2, void *arg3),
+ void *arg1,
+ void *arg2,
+ void *arg3);
+
extern int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int, void*), void *closure);
extern int jobs_init(int allowed_count, int start_count, int waiter_count);
extern int jobs_add_me();
-extern void jobs_terminate(int wait);
+extern void jobs_terminate();
diff --git a/src/tests/test-thread.c b/src/tests/test-thread.c
index 2f0e1853..3ed31401 100644
--- a/src/tests/test-thread.c
+++ b/src/tests/test-thread.c
@@ -24,7 +24,7 @@ void unref(void *closure)
{
struct foo *foo = closure;
if(!--foo->refcount) {
- printf("%06d FREE\n", foo->value);
+ /* printf("%06d FREE\n", foo->value); */
free(foo);
}
}
@@ -32,7 +32,7 @@ void unref(void *closure)
void fail(void *closure, const char *status, const char *info)
{
struct foo *foo = closure;
- printf("%06d ERROR %s\n", foo->value, status);
+ printf("%06d ABORT T%d %s\n", foo->value, (int)syscall(SYS_gettid), status);
}
struct afb_req_itf itf = {
@@ -70,6 +70,17 @@ void process(struct afb_req req)
// nanosleep(&ts, NULL);
}
+void terminate(int signum)
+{
+ printf("---------------- TERMINATE T%d (%d)\n", (int)syscall(SYS_gettid), signum);
+#if 0
+ jobs_terminate();
+#else
+ jobs_invoke0(0, jobs_terminate);
+#endif
+ exit(0);
+}
+
int main()
{
int i;
@@ -85,6 +96,12 @@ int main()
foo->refcount = 1;
afb_thread_req_call(req, process, 5, (&ts) + (i % 7));
unref(foo);
+ if (i == 5000)
+#if 1
+ jobs_invoke0(0, terminate);
+#else
+ jobs_queue0(NULL, 0, terminate);
+#endif
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
// nanosleep(&ts, NULL);
@@ -93,6 +110,7 @@ int main()
ts.tv_nsec = 0;
nanosleep(&ts, NULL);
jobs_terminate();
+ return 0;
}
diff --git a/src/tests/test-thread.sh b/src/tests/test-thread.sh
index d5186d19..fe73516f 100755
--- a/src/tests/test-thread.sh
+++ b/src/tests/test-thread.sh
@@ -1,4 +1,4 @@
#!/bin/sh
cc test-thread.c ../afb-thread.c ../verbose.c ../sig-monitor.c ../jobs.c -o test-thread -lrt -lpthread -I../../include -g -O2
-gdb ./test-thread
+./test-thread