summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-03-27 22:49:44 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2017-03-27 22:49:44 +0200
commitf9fc4077cc0eb167f3e65f54cc27717c79beee92 (patch)
tree46482fa5b4ddd6912867c83047f9eeedb5b6f0e3
parentc3d27b0ac659776f4065bc04b94f12c6b7e40084 (diff)
Make main thread used for common jobs
This commit make the main thread behaving like any other threads. The main loop is shared across threads, the first one without job taking it. The main event loop now have the lowest priority. It is activated only when no job is queued. This has the good effect to not try to overfill the queue of jobs. Change-Id: I07cecc9d94a02134c63bc2a814db56e171ab719e Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c292
-rw-r--r--src/jobs.h3
-rw-r--r--src/main.c28
3 files changed, 229 insertions, 94 deletions
diff --git a/src/jobs.c b/src/jobs.c
index c4f32244..1be6ec7b 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -30,13 +30,12 @@
#include "sig-monitor.h"
#include "verbose.h"
-/* control of threads */
+/** control of threads */
struct thread
{
- pthread_t tid; /* the thread id */
- unsigned stop: 1; /* stop request */
- unsigned ended: 1; /* ended status */
- unsigned works: 1; /* is it processing a job? */
+ struct thread *next; /**< next thread of the list */
+ pthread_t tid; /**< the thread id */
+ unsigned stop: 1; /**< stop request */
};
/* describes pending job */
@@ -56,42 +55,54 @@ struct job
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-/* queue of pending jobs */
-static struct job *first_job = NULL;
-
/* count allowed, started and running threads */
-static int allowed = 0;
-static int started = 0;
-static int running = 0;
-static int remains = 0;
+static int allowed = 0; /** allowed count of threads */
+static int started = 0; /** started count of threads */
+static int running = 0; /** running count of threads */
+static int remains = 0; /** remaining count of jobs that can be created */
/* list of threads */
-static struct thread *threads = NULL;
+static struct thread *threads;
-/* add the job to the list */
+/* queue of pending jobs */
+static struct job *first_job;
+static struct job *first_evloop;
+static struct job *free_jobs;
+
+/**
+ * Adds the 'job' at the end of the list of jobs, marking it
+ * as blocked if an other job with the same group is pending.
+ * @param job the job to add
+ */
static inline void job_add(struct job *job)
{
- void *group = job->group;
+ void *group;
struct job *ijob, **pjob;
pjob = &first_job;
ijob = first_job;
- group = job->group ? : job;
+ group = job->group ? : (void*)(intptr_t)1;
while (ijob) {
if (ijob->group == group)
job->blocked = 1;
pjob = &ijob->next;
ijob = ijob->next;
}
- *pjob = job;
job->next = NULL;
+ *pjob = job;
remains--;
}
-/* get the next job to process or NULL if none */
+/**
+ * Get the next job to process or NULL if none.
+ * The returned job if any is removed from the list of
+ * jobs.
+ * @return the job to process
+ */
static inline struct job *job_get()
{
struct job *job, **pjob;
+
pjob = &first_job;
job = first_job;
while (job && job->blocked) {
@@ -105,7 +116,10 @@ static inline struct job *job_get()
return job;
}
-/* unblock a group of job */
+/**
+ * Unblock the first pending job of a group (if any)
+ * @param group the group to unblock
+ */
static inline void job_unblock(void *group)
{
struct job *job;
@@ -120,74 +134,140 @@ static inline void job_unblock(void *group)
}
}
-/* call the job */
-static inline void job_call(int signum, void *arg)
+static struct job *job_create(
+ void *group,
+ int timeout,
+ void (*callback)(int, void*, void *, void*),
+ void *arg1,
+ void *arg2,
+ void *arg3)
+{
+ struct job *job;
+
+ /* allocates the job */
+ job = free_jobs;
+ if (!job) {
+ pthread_mutex_unlock(&mutex);
+ job = malloc(sizeof *job);
+ pthread_mutex_lock(&mutex);
+ if (!job) {
+ errno = -ENOMEM;
+ goto end;
+ }
+ }
+ job->group = group;
+ job->timeout = timeout;
+ job->callback = callback;
+ job->arg1 = arg1;
+ job->arg2 = arg2;
+ job->arg3 = arg3;
+ job->blocked = 0;
+end:
+ return job;
+}
+
+static inline void job_destroy(struct job *job)
+{
+ job->next = free_jobs;
+ free_jobs = job;
+}
+
+static inline void job_release(struct job *job)
+{
+ if (job->group)
+ job_unblock(job->group);
+ job_destroy(job);
+}
+
+/** monitored call to the job */
+static void job_call(int signum, void *arg)
{
struct job *job = arg;
job->callback(signum, job->arg1, job->arg2, job->arg3);
}
-/* cancel the job */
-static inline void job_cancel(int signum, void *arg)
+/** monitored cancel of the job */
+static void job_cancel(int signum, void *arg)
{
- struct job *job = arg;
- job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
+ job_call(SIGABRT, arg);
}
/* main loop of processing threads */
static void *thread_main_loop(void *data)
{
- struct thread *me = data;
+ struct thread me, **prv;
struct job *job;
- me->works = 0;
- me->ended = 0;
+ /* init */
+ me.tid = pthread_self();
+ me.stop = 0;
sig_monitor_init_timeouts();
+
+ /* chain in */
pthread_mutex_lock(&mutex);
- while (!me->stop) {
+ me.next = threads;
+ threads = &me;
+
+ /* loop until stopped */
+ running++;
+ while (!me.stop) {
/* get a job */
job = job_get();
- if (job == NULL && first_job != NULL && running == 0) {
+ if (!job && first_job && running == 0) {
/* sad situation!! should not happen */
ERROR("threads are blocked!");
job = first_job;
first_job = job->next;
}
- if (job == NULL) {
- /* no job... */
- pthread_cond_wait(&cond, &mutex);
- } else {
+ if (job) {
/* run the job */
- running++;
- me->works = 1;
pthread_mutex_unlock(&mutex);
sig_monitor(job->timeout, job_call, job);
pthread_mutex_lock(&mutex);
- me->works = 0;
- running--;
- if (job->group != NULL)
- job_unblock(job->group);
- free(job);
+ job_release(job);
+ } else {
+ /* no job, check evloop */
+ job = first_evloop;
+ if (job) {
+ /* evloop */
+ first_evloop = job->next;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(job->timeout, job_call, job);
+ pthread_mutex_lock(&mutex);
+ job->next = first_evloop;
+ first_evloop = job;
+ } else {
+ /* no job and not evloop */
+ running--;
+ pthread_cond_wait(&cond, &mutex);
+ running++;
+ }
}
-
}
- me->ended = 1;
+ running--;
+
+ /* chain out */
+ prv = &threads;
+ while (*prv != &me)
+ prv = &(*prv)->next;
+ *prv = me.next;
pthread_mutex_unlock(&mutex);
+
+ /* uninit and terminate */
sig_monitor_clean_timeouts();
- return me;
+ return NULL;
}
/* start a new thread */
static int start_one_thread()
{
- struct thread *t;
+ pthread_t tid;
int rc;
assert(started < allowed);
- t = &threads[started++];
- t->stop = 0;
- rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+ started++;
+ rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
if (rc != 0) {
started--;
errno = rc;
@@ -229,16 +309,17 @@ int jobs_queue3(
struct job *job;
int rc;
+ pthread_mutex_lock(&mutex);
+
/* allocates the job */
- job = malloc(sizeof *job);
- if (job == NULL) {
+ job = job_create(group, timeout, callback, arg1, arg2, arg3);
+ if (!job) {
errno = ENOMEM;
info = "out of memory";
goto error;
}
/* start a thread if needed */
- pthread_mutex_lock(&mutex);
if (remains == 0) {
errno = EBUSY;
info = "too many jobs";
@@ -253,14 +334,7 @@ int jobs_queue3(
}
}
- /* fills and queues the job */
- job->group = group;
- job->timeout = timeout;
- job->callback = callback;
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
- job->blocked = 0;
+ /* queues the job */
job_add(job);
pthread_mutex_unlock(&mutex);
@@ -269,23 +343,16 @@ int jobs_queue3(
return 0;
error2:
- pthread_mutex_unlock(&mutex);
- free(job);
+ job_destroy(job);
error:
ERROR("can't process job with threads: %s, %m", info);
+ pthread_mutex_unlock(&mutex);
return -1;
}
/* initialise the threads */
int jobs_init(int allowed_count, int start_count, int waiter_count)
{
- threads = calloc(allowed_count, sizeof *threads);
- if (threads == NULL) {
- errno = ENOMEM;
- ERROR("can't allocate threads");
- return -1;
- }
-
/* records the allowed count */
allowed = allowed_count;
started = 0;
@@ -304,34 +371,31 @@ int jobs_init(int allowed_count, int start_count, int waiter_count)
/* terminate all the threads and all pending requests */
void jobs_terminate()
{
- int i, n;
struct job *job;
+ pthread_t me, other;
+ struct thread *t;
+
+ /* how am i? */
+ me = pthread_self();
/* request all threads to stop */
pthread_mutex_lock(&mutex);
allowed = 0;
- n = started;
- for (i = 0 ; i < n ; i++)
- threads[i].stop = 1;
-
- /* wait until all thread are terminated */
- while (started != 0) {
- /* signal threads */
+ for(;;) {
+ /* search the next thread to stop */
+ t = threads;
+ while (t && pthread_equal(t->tid, me))
+ t = t->next;
+ if (!t)
+ break;
+ /* stop it */
+ other = t->tid;
+ t->stop = 1;
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond);
+ pthread_join(other, NULL);
pthread_mutex_lock(&mutex);
-
- /* join the terminated threads */
- for (i = 0 ; i < n ; i++) {
- if (threads[i].tid && threads[i].ended) {
- pthread_join(threads[i].tid, NULL);
- threads[i].tid = 0;
- started--;
- }
- }
}
- pthread_mutex_unlock(&mutex);
- free(threads);
/* cancel pending jobs */
while (first_job) {
@@ -342,3 +406,57 @@ void jobs_terminate()
}
}
+int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
+{
+ struct job *job;
+
+ pthread_mutex_lock(&mutex);
+ job = job_create(key, timeout, (void (*)(int, void *, void *, void *))evloop, closure, NULL, NULL);
+ if (job) {
+ /* adds the loop */
+ job->next = first_evloop;
+ first_evloop = job;
+
+ /* signal the loop */
+ pthread_cond_signal(&cond);
+ }
+ pthread_mutex_unlock(&mutex);
+ return -!job;
+}
+
+int jobs_add_me()
+{
+ pthread_t me;
+ struct thread *t;
+
+ /* how am i? */
+ me = pthread_self();
+
+ /* request all threads to stop */
+ pthread_mutex_lock(&mutex);
+ t = threads;
+ while (t) {
+ if (pthread_equal(t->tid, me)) {
+ pthread_mutex_unlock(&mutex);
+ ERROR("thread already running");
+ errno = EINVAL;
+ return -1;
+ }
+ t = t->next;
+ }
+
+ /* allowed... */
+ allowed++;
+ pthread_mutex_unlock(&mutex);
+
+ /* run */
+ thread_main_loop(NULL);
+
+ /* returns */
+ pthread_mutex_lock(&mutex);
+ allowed--;
+ pthread_mutex_unlock(&mutex);
+ return 0;
+}
+
+
diff --git a/src/jobs.h b/src/jobs.h
index ef72e0c1..b6f277c5 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -38,7 +38,10 @@ extern int jobs_queue3(
void *arg2,
void *arg3);
+extern int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int, void*), void *closure);
+
extern int jobs_init(int allowed_count, int start_count, int waiter_count);
+extern int jobs_add_me();
extern void jobs_terminate();
diff --git a/src/main.c b/src/main.c
index d31f1f52..6f210d92 100644
--- a/src/main.c
+++ b/src/main.c
@@ -394,6 +394,17 @@ static int execute_command()
}
/*---------------------------------------------------------
+ | main event processing
+ +--------------------------------------------------------- */
+
+static void main_evloop(int signum, void *closure)
+{
+ struct sd_event *evloop = closure;
+ if (signum == 0)
+ sd_event_run(evloop, 30000000);
+}
+
+/*---------------------------------------------------------
| main
| Parse option and launch action
+--------------------------------------------------------- */
@@ -401,7 +412,6 @@ static int execute_command()
int main(int argc, char *argv[])
{
struct afb_hsrv *hsrv;
- struct sd_event *eventloop;
LOGAUTH("afb-daemon");
@@ -495,13 +505,17 @@ int main(int argc, char *argv[])
if (execute_command() < 0)
exit(1);
- // infinite loop
- eventloop = afb_common_get_event_loop();
- sd_notify(1, "READY=1");
- for (;;)
- sd_event_run(eventloop, 30000000);
+ /* records the loop */
+ if (jobs_add_event_loop(NULL, 0, main_evloop, afb_common_get_event_loop()) < 0) {
+ ERROR("failed to set main_evloop");
+ return 1;
+ }
- WARNING("hoops returned from infinite loop [report bug]");
+ /* ready */
+ sd_notify(1, "READY=1");
+ /* turn as processing thread */
+ jobs_add_me();
+ WARNING("hoops returned from jobs_add_me! [report bug]");
return 0;
}