diff options
-rw-r--r-- | src/afb-common.c | 6 | ||||
-rw-r--r-- | src/jobs.c | 265 | ||||
-rw-r--r-- | src/jobs.h | 5 | ||||
-rw-r--r-- | src/main.c | 19 |
4 files changed, 140 insertions, 155 deletions
diff --git a/src/afb-common.c b/src/afb-common.c index 47ba40f7..087a6285 100644 --- a/src/afb-common.c +++ b/src/afb-common.c @@ -27,6 +27,7 @@ #include "afb-common.h" #include "locale-root.h" +#include "jobs.h" static const char *default_locale = NULL; static struct locale_root *rootdir = NULL; @@ -42,7 +43,6 @@ struct sd_event *afb_common_get_thread_event_loop() } return result; } -*/ static void *sdopen(void **p, int (*f)(void **)) { @@ -55,6 +55,7 @@ static void *sdopen(void **p, int (*f)(void **)) } return *p; } +*/ static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **)) { @@ -77,8 +78,7 @@ static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **)) struct sd_event *afb_common_get_event_loop() { - static struct sd_event *result = NULL; - return sdopen((void*)&result, (void*)sd_event_new); + return jobs_get_sd_event(); } struct sd_bus *afb_common_get_user_bus() @@ -26,6 +26,8 @@ #include <errno.h> #include <assert.h> +#include <systemd/sd-event.h> + #include "jobs.h" #include "sig-monitor.h" #include "verbose.h" @@ -54,16 +56,25 @@ struct job unsigned dropped: 1; /**< is removed ? */ }; +/** Description of handled event loops */ +struct events +{ + struct events *next; + struct sd_event *event; + unsigned runs: 1; +}; + /** Description of threads */ struct thread { - struct thread *next; /**< next thread of the list */ - struct thread *upper; /**< upper same thread */ - struct job *job; /**< currently processed job */ - pthread_t tid; /**< the thread id */ - unsigned stop: 1; /**< stop requested */ - unsigned lowered: 1; /**< has a lower same thread */ - unsigned waits: 1; /**< is waiting? */ + struct thread *next; /**< next thread of the list */ + struct thread *upper; /**< upper same thread */ + struct job *job; /**< currently processed job */ + struct events *events; /**< currently processed job */ + pthread_t tid; /**< the thread id */ + unsigned stop: 1; /**< stop requested */ + unsigned lowered: 1; /**< has a lower same thread */ + unsigned waits: 1; /**< is waiting? */ }; /* synchronisation of threads */ @@ -75,6 +86,7 @@ static int allowed = 0; /** allowed count of threads */ static int started = 0; /** started count of threads */ static int waiting = 0; /** waiting count of threads */ static int remains = 0; /** allowed count of waiting jobs */ +static int nevents = 0; /** count of events */ /* list of threads */ static struct thread *threads; @@ -82,7 +94,7 @@ static _Thread_local struct thread *current; /* queue of pending jobs */ static struct job *first_job; -static struct job *first_events; +static struct events *first_events; static struct job *free_jobs; /** @@ -176,17 +188,29 @@ static void job_add2(struct job *job1, struct job *job2) /** * Get the next job to process or NULL if none. - * @param job the head of the list to search. * @return the first job that isn't blocked or NULL */ -static inline struct job *job_get(struct job *job) +static inline struct job *job_get() { + struct job *job = first_job; while (job && job->blocked) job = job->next; return job; } /** + * Get the next events to process or NULL if none. + * @return the first events that isn't running or NULL + */ +static inline struct events *events_get() +{ + struct events *events = first_events; + while (events && events->runs) + events = events->next; + return events; +} + +/** * Releases the processed 'job': removes it * from the list of jobs and unblock the first * pending job of the same group if any. @@ -222,48 +246,6 @@ static inline void job_release(struct job *job) } /** - * Releases the events 'job': removes it - * from the list of events. - * @param job the event to release - */ -static inline void events_release(struct job *job) -{ - struct job *ijob, **pjob; - - /* first unqueue the job */ - pjob = &first_events; - ijob = first_events; - while (ijob != job) { - pjob = &ijob->next; - ijob = ijob->next; - } - *pjob = job->next; - - /* recycle the job */ - job->next = free_jobs; - free_jobs = job; -} - -/** - * Get the events of 'key' if existing. - * @param key the key to search - * @return the found events or NULL if none existing has key - */ -static inline struct job *events_of_key(void *key) -{ - struct job *job; - - if (!key) - job = NULL; - else { - job = first_events; - while (job && (job->dropped || job->group != key)) - job = job->next; - } - return job; -} - -/** * Monitored normal callback for a job. * This function is called by the monitor * to run the job when the safe environment @@ -295,16 +277,34 @@ static void job_cancel(int signum, void *arg) } /** + * Monitored normal callback for events. + * This function is called by the monitor + * to run the event loop when the safe environment + * is set. + * @param signum 0 on normal flow or the number + * of the signal that interrupted the normal + * flow + * @param arg the events to run + */ +static void events_call(int signum, void *arg) +{ + struct events *events = arg; + if (!signum) + sd_event_run(events->event, (uint64_t) -1); +} + +/** * Main processing loop of threads processing jobs. * The loop must be called with the mutex locked * and it returns with the mutex locked. * @param me the description of the thread to use * TODO: how are timeout handled when reentering? */ -static void thread_run(struct thread *me) +static void thread_run(volatile struct thread *me) { struct thread **prv; struct job *job; + struct events *events; /* initialize description of itself and link it in the list */ me->tid = pthread_self(); @@ -316,12 +316,13 @@ static void thread_run(struct thread *me) current->lowered = 1; else sig_monitor_init_timeouts(); - current = me; + current = (struct thread*)me; me->next = threads; - threads = me; + threads = (struct thread*)me; started++; /* loop until stopped */ + me->events = NULL; while (!me->stop) { /* get a job */ job = job_get(first_job); @@ -338,18 +339,25 @@ static void thread_run(struct thread *me) /* release the run job */ job_release(job); + + /* release event if any */ + events = me->events; + if (events) { + events->runs = 0; + me->events = NULL; + } } else { /* no job, check events */ - job = job_get(first_events); - if (job) { + events = events_get(); + if (events) { /* run the events */ - job->blocked = 1; + events->runs = 1; + me->events = events; pthread_mutex_unlock(&mutex); - sig_monitor(job->timeout, job_call, job); + sig_monitor(0, events_call, events); pthread_mutex_lock(&mutex); - job->blocked = 0; - if (job->dropped) - events_release(job); + events->runs = 0; + me->events = NULL; } else { /* no job and not events */ waiting++; @@ -813,76 +821,6 @@ void jobs_terminate() } /** - * Adds the events waiter/dispatcher to the list of events waiters/dispatchers - * to monitor. - * @param key A key to register the events waiter/dispatcher (see - * 'jobs_del_events') - * @param timeout Timeout in second of the function or 0 if none - * @param events The callback, the first argument is 0 for normal - * flow or the signal number when normal flow failed - * @param closure The closure to give to the callback as secondd argument - * @return 0 in case of success or -1 in case of error - */ -int jobs_add_events(void *key, int timeout, void (*events)(int signum, void*), void *closure) -{ - struct job *job; - - pthread_mutex_lock(&mutex); - - /* look at an already existsing events for same key */ - job = events_of_key(key); - if (job) { - pthread_mutex_unlock(&mutex); - ERROR("events of key %p already exist", key); - errno = EEXIST; - return -1; - } - - /* creates the job */ - job = job_create(key, timeout, (job_cb_t)events, closure, NULL, NULL); - if (!job) { - pthread_mutex_unlock(&mutex); - ERROR("Can't create events, out of memory"); - errno = ENOMEM; - return -1; - } - - /* adds the loop */ - job->next = first_events; - first_events = job; - - /* signal the loop */ - if (waiting) - pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - return 0; -} - -/** - * Removes the events of 'key' - * @param key The key of the events to remove - * @return 0 in case of success or -1 in case of error - */ -int jobs_del_events(void *key) -{ - struct job *job; - - pthread_mutex_lock(&mutex); - job = events_of_key(key); - if (job) - if (job->blocked) - job->dropped = 1; - else - events_release(job); - pthread_mutex_unlock(&mutex); - if (!job) { - ERROR("events of key %p not found", key); - errno = ENOENT; - } - return -!job; -} - -/** * Adds the current thread to the pool of threads * processing the jobs. Returns normally when the threads are * terminated or immediately with an error if the thread is @@ -910,3 +848,66 @@ int jobs_add_me() } +struct sd_event *jobs_get_sd_event() +{ + struct events *events; + struct thread *me; + int rc; + + pthread_mutex_lock(&mutex); + + /* search events on stack */ + me = current; + while (me && !me->events) + me = me->upper; + if (me) + /* return the stacked events */ + events = me->events; + else { + /* search an available events */ + events = events_get(); + if (!events) { + /* not found, check if creation possible */ + if (nevents >= allowed) { + ERROR("not possible to add a new event"); + events = NULL; + } else { + events = malloc(sizeof *events); + if (events && (rc = sd_event_new(&events->event)) >= 0) { + if (nevents < started || start_one_thread() >= 0) { + events->runs = 0; + events->next = first_events; + first_events = events; + } else { + ERROR("can't start thread for events"); + sd_event_unref(events->event); + free(events); + events = NULL; + } + } else { + if (!events) + ERROR("out of memory"); + else { + free(events); + ERROR("creation of sd_event failed: %m"); + events = NULL; + errno = -rc; + } + } + } + } + if (events) { + /* */ + me = current; + if (me) { + events->runs = 1; + me->events = events; + } else { + WARNING("event returned for unknown thread!"); + } + } + } + pthread_mutex_unlock(&mutex); + return events ? events->event : NULL; +} + @@ -17,6 +17,8 @@ #pragma once +struct sd_event; + extern int jobs_queue0( void *group, int timeout, @@ -65,8 +67,7 @@ extern int jobs_invoke3( void *arg2, void *arg3); -extern int jobs_add_events(void *key, int timeout, void (*events)(int, void*), void *closure); -extern int jobs_del_events(void *key); +extern struct sd_event *jobs_get_sd_event(); extern int jobs_init(int allowed_count, int start_count, int waiter_count); extern int jobs_add_me(); @@ -394,17 +394,6 @@ static int execute_command() } /*--------------------------------------------------------- - | main event processing - +--------------------------------------------------------- */ - -static void main_event_wait_and_dispatch(int signum, void *closure) -{ - struct sd_event *event = closure; - if (signum == 0) - sd_event_run(event, 30000000); -} - -/*--------------------------------------------------------- | job for starting the daemon +--------------------------------------------------------- */ @@ -518,15 +507,9 @@ int main(int argc, char *argv[]) return 1; } - /* records the loop */ - if (jobs_add_events(NULL, 0, main_event_wait_and_dispatch, afb_common_get_event_loop()) < 0) { - ERROR("failed to set main_event_wait_and_dispatch"); - return 1; - } - /* queue the start job */ if (jobs_queue0(NULL, 0, start) < 0) { - ERROR("failed to set main_event_wait_and_dispatch"); + ERROR("failed to start runnning jobs"); return 1; } |