diff options
-rw-r--r-- | src/jobs.c | 292 | ||||
-rw-r--r-- | src/jobs.h | 3 | ||||
-rw-r--r-- | src/main.c | 28 |
3 files changed, 229 insertions, 94 deletions
@@ -30,13 +30,12 @@ #include "sig-monitor.h" #include "verbose.h" -/* control of threads */ +/** control of threads */ struct thread { - pthread_t tid; /* the thread id */ - unsigned stop: 1; /* stop request */ - unsigned ended: 1; /* ended status */ - unsigned works: 1; /* is it processing a job? */ + struct thread *next; /**< next thread of the list */ + pthread_t tid; /**< the thread id */ + unsigned stop: 1; /**< stop request */ }; /* describes pending job */ @@ -56,42 +55,54 @@ struct job static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -/* queue of pending jobs */ -static struct job *first_job = NULL; - /* count allowed, started and running threads */ -static int allowed = 0; -static int started = 0; -static int running = 0; -static int remains = 0; +static int allowed = 0; /** allowed count of threads */ +static int started = 0; /** started count of threads */ +static int running = 0; /** running count of threads */ +static int remains = 0; /** remaining count of jobs that can be created */ /* list of threads */ -static struct thread *threads = NULL; +static struct thread *threads; -/* add the job to the list */ +/* queue of pending jobs */ +static struct job *first_job; +static struct job *first_evloop; +static struct job *free_jobs; + +/** + * Adds the 'job' at the end of the list of jobs, marking it + * as blocked if an other job with the same group is pending. + * @param job the job to add + */ static inline void job_add(struct job *job) { - void *group = job->group; + void *group; struct job *ijob, **pjob; pjob = &first_job; ijob = first_job; - group = job->group ? : job; + group = job->group ? : (void*)(intptr_t)1; while (ijob) { if (ijob->group == group) job->blocked = 1; pjob = &ijob->next; ijob = ijob->next; } - *pjob = job; job->next = NULL; + *pjob = job; remains--; } -/* get the next job to process or NULL if none */ +/** + * Get the next job to process or NULL if none. + * The returned job if any is removed from the list of + * jobs. + * @return the job to process + */ static inline struct job *job_get() { struct job *job, **pjob; + pjob = &first_job; job = first_job; while (job && job->blocked) { @@ -105,7 +116,10 @@ static inline struct job *job_get() return job; } -/* unblock a group of job */ +/** + * Unblock the first pending job of a group (if any) + * @param group the group to unblock + */ static inline void job_unblock(void *group) { struct job *job; @@ -120,74 +134,140 @@ static inline void job_unblock(void *group) } } -/* call the job */ -static inline void job_call(int signum, void *arg) +static struct job *job_create( + void *group, + int timeout, + void (*callback)(int, void*, void *, void*), + void *arg1, + void *arg2, + void *arg3) +{ + struct job *job; + + /* allocates the job */ + job = free_jobs; + if (!job) { + pthread_mutex_unlock(&mutex); + job = malloc(sizeof *job); + pthread_mutex_lock(&mutex); + if (!job) { + errno = -ENOMEM; + goto end; + } + } + job->group = group; + job->timeout = timeout; + job->callback = callback; + job->arg1 = arg1; + job->arg2 = arg2; + job->arg3 = arg3; + job->blocked = 0; +end: + return job; +} + +static inline void job_destroy(struct job *job) +{ + job->next = free_jobs; + free_jobs = job; +} + +static inline void job_release(struct job *job) +{ + if (job->group) + job_unblock(job->group); + job_destroy(job); +} + +/** monitored call to the job */ +static void job_call(int signum, void *arg) { struct job *job = arg; job->callback(signum, job->arg1, job->arg2, job->arg3); } -/* cancel the job */ -static inline void job_cancel(int signum, void *arg) +/** monitored cancel of the job */ +static void job_cancel(int signum, void *arg) { - struct job *job = arg; - job->callback(SIGABRT, job->arg1, job->arg2, job->arg3); + job_call(SIGABRT, arg); } /* main loop of processing threads */ static void *thread_main_loop(void *data) { - struct thread *me = data; + struct thread me, **prv; struct job *job; - me->works = 0; - me->ended = 0; + /* init */ + me.tid = pthread_self(); + me.stop = 0; sig_monitor_init_timeouts(); + + /* chain in */ pthread_mutex_lock(&mutex); - while (!me->stop) { + me.next = threads; + threads = &me; + + /* loop until stopped */ + running++; + while (!me.stop) { /* get a job */ job = job_get(); - if (job == NULL && first_job != NULL && running == 0) { + if (!job && first_job && running == 0) { /* sad situation!! should not happen */ ERROR("threads are blocked!"); job = first_job; first_job = job->next; } - if (job == NULL) { - /* no job... */ - pthread_cond_wait(&cond, &mutex); - } else { + if (job) { /* run the job */ - running++; - me->works = 1; pthread_mutex_unlock(&mutex); sig_monitor(job->timeout, job_call, job); pthread_mutex_lock(&mutex); - me->works = 0; - running--; - if (job->group != NULL) - job_unblock(job->group); - free(job); + job_release(job); + } else { + /* no job, check evloop */ + job = first_evloop; + if (job) { + /* evloop */ + first_evloop = job->next; + pthread_mutex_unlock(&mutex); + sig_monitor(job->timeout, job_call, job); + pthread_mutex_lock(&mutex); + job->next = first_evloop; + first_evloop = job; + } else { + /* no job and not evloop */ + running--; + pthread_cond_wait(&cond, &mutex); + running++; + } } - } - me->ended = 1; + running--; + + /* chain out */ + prv = &threads; + while (*prv != &me) + prv = &(*prv)->next; + *prv = me.next; pthread_mutex_unlock(&mutex); + + /* uninit and terminate */ sig_monitor_clean_timeouts(); - return me; + return NULL; } /* start a new thread */ static int start_one_thread() { - struct thread *t; + pthread_t tid; int rc; assert(started < allowed); - t = &threads[started++]; - t->stop = 0; - rc = pthread_create(&t->tid, NULL, thread_main_loop, t); + started++; + rc = pthread_create(&tid, NULL, thread_main_loop, NULL); if (rc != 0) { started--; errno = rc; @@ -229,16 +309,17 @@ int jobs_queue3( struct job *job; int rc; + pthread_mutex_lock(&mutex); + /* allocates the job */ - job = malloc(sizeof *job); - if (job == NULL) { + job = job_create(group, timeout, callback, arg1, arg2, arg3); + if (!job) { errno = ENOMEM; info = "out of memory"; goto error; } /* start a thread if needed */ - pthread_mutex_lock(&mutex); if (remains == 0) { errno = EBUSY; info = "too many jobs"; @@ -253,14 +334,7 @@ int jobs_queue3( } } - /* fills and queues the job */ - job->group = group; - job->timeout = timeout; - job->callback = callback; - job->arg1 = arg1; - job->arg2 = arg2; - job->arg3 = arg3; - job->blocked = 0; + /* queues the job */ job_add(job); pthread_mutex_unlock(&mutex); @@ -269,23 +343,16 @@ int jobs_queue3( return 0; error2: - pthread_mutex_unlock(&mutex); - free(job); + job_destroy(job); error: ERROR("can't process job with threads: %s, %m", info); + pthread_mutex_unlock(&mutex); return -1; } /* initialise the threads */ int jobs_init(int allowed_count, int start_count, int waiter_count) { - threads = calloc(allowed_count, sizeof *threads); - if (threads == NULL) { - errno = ENOMEM; - ERROR("can't allocate threads"); - return -1; - } - /* records the allowed count */ allowed = allowed_count; started = 0; @@ -304,34 +371,31 @@ int jobs_init(int allowed_count, int start_count, int waiter_count) /* terminate all the threads and all pending requests */ void jobs_terminate() { - int i, n; struct job *job; + pthread_t me, other; + struct thread *t; + + /* how am i? */ + me = pthread_self(); /* request all threads to stop */ pthread_mutex_lock(&mutex); allowed = 0; - n = started; - for (i = 0 ; i < n ; i++) - threads[i].stop = 1; - - /* wait until all thread are terminated */ - while (started != 0) { - /* signal threads */ + for(;;) { + /* search the next thread to stop */ + t = threads; + while (t && pthread_equal(t->tid, me)) + t = t->next; + if (!t) + break; + /* stop it */ + other = t->tid; + t->stop = 1; pthread_mutex_unlock(&mutex); pthread_cond_broadcast(&cond); + pthread_join(other, NULL); pthread_mutex_lock(&mutex); - - /* join the terminated threads */ - for (i = 0 ; i < n ; i++) { - if (threads[i].tid && threads[i].ended) { - pthread_join(threads[i].tid, NULL); - threads[i].tid = 0; - started--; - } - } } - pthread_mutex_unlock(&mutex); - free(threads); /* cancel pending jobs */ while (first_job) { @@ -342,3 +406,57 @@ void jobs_terminate() } } +int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure) +{ + struct job *job; + + pthread_mutex_lock(&mutex); + job = job_create(key, timeout, (void (*)(int, void *, void *, void *))evloop, closure, NULL, NULL); + if (job) { + /* adds the loop */ + job->next = first_evloop; + first_evloop = job; + + /* signal the loop */ + pthread_cond_signal(&cond); + } + pthread_mutex_unlock(&mutex); + return -!job; +} + +int jobs_add_me() +{ + pthread_t me; + struct thread *t; + + /* how am i? */ + me = pthread_self(); + + /* request all threads to stop */ + pthread_mutex_lock(&mutex); + t = threads; + while (t) { + if (pthread_equal(t->tid, me)) { + pthread_mutex_unlock(&mutex); + ERROR("thread already running"); + errno = EINVAL; + return -1; + } + t = t->next; + } + + /* allowed... */ + allowed++; + pthread_mutex_unlock(&mutex); + + /* run */ + thread_main_loop(NULL); + + /* returns */ + pthread_mutex_lock(&mutex); + allowed--; + pthread_mutex_unlock(&mutex); + return 0; +} + + @@ -38,7 +38,10 @@ extern int jobs_queue3( void *arg2, void *arg3); +extern int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int, void*), void *closure); + extern int jobs_init(int allowed_count, int start_count, int waiter_count); +extern int jobs_add_me(); extern void jobs_terminate(); @@ -394,6 +394,17 @@ static int execute_command() } /*--------------------------------------------------------- + | main event processing + +--------------------------------------------------------- */ + +static void main_evloop(int signum, void *closure) +{ + struct sd_event *evloop = closure; + if (signum == 0) + sd_event_run(evloop, 30000000); +} + +/*--------------------------------------------------------- | main | Parse option and launch action +--------------------------------------------------------- */ @@ -401,7 +412,6 @@ static int execute_command() int main(int argc, char *argv[]) { struct afb_hsrv *hsrv; - struct sd_event *eventloop; LOGAUTH("afb-daemon"); @@ -495,13 +505,17 @@ int main(int argc, char *argv[]) if (execute_command() < 0) exit(1); - // infinite loop - eventloop = afb_common_get_event_loop(); - sd_notify(1, "READY=1"); - for (;;) - sd_event_run(eventloop, 30000000); + /* records the loop */ + if (jobs_add_event_loop(NULL, 0, main_evloop, afb_common_get_event_loop()) < 0) { + ERROR("failed to set main_evloop"); + return 1; + } - WARNING("hoops returned from infinite loop [report bug]"); + /* ready */ + sd_notify(1, "READY=1"); + /* turn as processing thread */ + jobs_add_me(); + WARNING("hoops returned from jobs_add_me! [report bug]"); return 0; } |