aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-03-30 13:33:11 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2017-03-30 13:33:11 +0200
commit89c44a872117fb8f64d38cbccf8f36776f2623f6 (patch)
tree89b26e67440c1f6f77fc6c1fcdeccf3b1ea1fb37
parent391ada39d89f9f90d186aed8e1d825be9c17a328 (diff)
Fix concurrency issues on event manager
Having only one event manager is not possible in multithreading due to the way that systemd has to manage timer events. We observed that timers were not armed when set in a thread because event was polling in an other thread. This patch provides more than one event manager and at most as many as the number of threads avalaible to start. Change-Id: Iaeab353b7bc79ce61361ab73c7b197a9e69a6109 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-common.c6
-rw-r--r--src/jobs.c265
-rw-r--r--src/jobs.h5
-rw-r--r--src/main.c19
4 files changed, 140 insertions, 155 deletions
diff --git a/src/afb-common.c b/src/afb-common.c
index 47ba40f7..087a6285 100644
--- a/src/afb-common.c
+++ b/src/afb-common.c
@@ -27,6 +27,7 @@
#include "afb-common.h"
#include "locale-root.h"
+#include "jobs.h"
static const char *default_locale = NULL;
static struct locale_root *rootdir = NULL;
@@ -42,7 +43,6 @@ struct sd_event *afb_common_get_thread_event_loop()
}
return result;
}
-*/
static void *sdopen(void **p, int (*f)(void **))
{
@@ -55,6 +55,7 @@ static void *sdopen(void **p, int (*f)(void **))
}
return *p;
}
+*/
static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
{
@@ -77,8 +78,7 @@ static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
struct sd_event *afb_common_get_event_loop()
{
- static struct sd_event *result = NULL;
- return sdopen((void*)&result, (void*)sd_event_new);
+ return jobs_get_sd_event();
}
struct sd_bus *afb_common_get_user_bus()
diff --git a/src/jobs.c b/src/jobs.c
index 03fe4254..8ffd6b61 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -26,6 +26,8 @@
#include <errno.h>
#include <assert.h>
+#include <systemd/sd-event.h>
+
#include "jobs.h"
#include "sig-monitor.h"
#include "verbose.h"
@@ -54,16 +56,25 @@ struct job
unsigned dropped: 1; /**< is removed ? */
};
+/** Description of handled event loops */
+struct events
+{
+ struct events *next;
+ struct sd_event *event;
+ unsigned runs: 1;
+};
+
/** Description of threads */
struct thread
{
- struct thread *next; /**< next thread of the list */
- struct thread *upper; /**< upper same thread */
- struct job *job; /**< currently processed job */
- pthread_t tid; /**< the thread id */
- unsigned stop: 1; /**< stop requested */
- unsigned lowered: 1; /**< has a lower same thread */
- unsigned waits: 1; /**< is waiting? */
+ struct thread *next; /**< next thread of the list */
+ struct thread *upper; /**< upper same thread */
+ struct job *job; /**< currently processed job */
+ struct events *events; /**< currently processed job */
+ pthread_t tid; /**< the thread id */
+ unsigned stop: 1; /**< stop requested */
+ unsigned lowered: 1; /**< has a lower same thread */
+ unsigned waits: 1; /**< is waiting? */
};
/* synchronisation of threads */
@@ -75,6 +86,7 @@ static int allowed = 0; /** allowed count of threads */
static int started = 0; /** started count of threads */
static int waiting = 0; /** waiting count of threads */
static int remains = 0; /** allowed count of waiting jobs */
+static int nevents = 0; /** count of events */
/* list of threads */
static struct thread *threads;
@@ -82,7 +94,7 @@ static _Thread_local struct thread *current;
/* queue of pending jobs */
static struct job *first_job;
-static struct job *first_events;
+static struct events *first_events;
static struct job *free_jobs;
/**
@@ -176,17 +188,29 @@ static void job_add2(struct job *job1, struct job *job2)
/**
* Get the next job to process or NULL if none.
- * @param job the head of the list to search.
* @return the first job that isn't blocked or NULL
*/
-static inline struct job *job_get(struct job *job)
+static inline struct job *job_get()
{
+ struct job *job = first_job;
while (job && job->blocked)
job = job->next;
return job;
}
/**
+ * Get the next events to process or NULL if none.
+ * @return the first events that isn't running or NULL
+ */
+static inline struct events *events_get()
+{
+ struct events *events = first_events;
+ while (events && events->runs)
+ events = events->next;
+ return events;
+}
+
+/**
* Releases the processed 'job': removes it
* from the list of jobs and unblock the first
* pending job of the same group if any.
@@ -222,48 +246,6 @@ static inline void job_release(struct job *job)
}
/**
- * Releases the events 'job': removes it
- * from the list of events.
- * @param job the event to release
- */
-static inline void events_release(struct job *job)
-{
- struct job *ijob, **pjob;
-
- /* first unqueue the job */
- pjob = &first_events;
- ijob = first_events;
- while (ijob != job) {
- pjob = &ijob->next;
- ijob = ijob->next;
- }
- *pjob = job->next;
-
- /* recycle the job */
- job->next = free_jobs;
- free_jobs = job;
-}
-
-/**
- * Get the events of 'key' if existing.
- * @param key the key to search
- * @return the found events or NULL if none existing has key
- */
-static inline struct job *events_of_key(void *key)
-{
- struct job *job;
-
- if (!key)
- job = NULL;
- else {
- job = first_events;
- while (job && (job->dropped || job->group != key))
- job = job->next;
- }
- return job;
-}
-
-/**
* Monitored normal callback for a job.
* This function is called by the monitor
* to run the job when the safe environment
@@ -295,16 +277,34 @@ static void job_cancel(int signum, void *arg)
}
/**
+ * Monitored normal callback for events.
+ * This function is called by the monitor
+ * to run the event loop when the safe environment
+ * is set.
+ * @param signum 0 on normal flow or the number
+ * of the signal that interrupted the normal
+ * flow
+ * @param arg the events to run
+ */
+static void events_call(int signum, void *arg)
+{
+ struct events *events = arg;
+ if (!signum)
+ sd_event_run(events->event, (uint64_t) -1);
+}
+
+/**
* Main processing loop of threads processing jobs.
* The loop must be called with the mutex locked
* and it returns with the mutex locked.
* @param me the description of the thread to use
* TODO: how are timeout handled when reentering?
*/
-static void thread_run(struct thread *me)
+static void thread_run(volatile struct thread *me)
{
struct thread **prv;
struct job *job;
+ struct events *events;
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
@@ -316,12 +316,13 @@ static void thread_run(struct thread *me)
current->lowered = 1;
else
sig_monitor_init_timeouts();
- current = me;
+ current = (struct thread*)me;
me->next = threads;
- threads = me;
+ threads = (struct thread*)me;
started++;
/* loop until stopped */
+ me->events = NULL;
while (!me->stop) {
/* get a job */
job = job_get(first_job);
@@ -338,18 +339,25 @@ static void thread_run(struct thread *me)
/* release the run job */
job_release(job);
+
+ /* release event if any */
+ events = me->events;
+ if (events) {
+ events->runs = 0;
+ me->events = NULL;
+ }
} else {
/* no job, check events */
- job = job_get(first_events);
- if (job) {
+ events = events_get();
+ if (events) {
/* run the events */
- job->blocked = 1;
+ events->runs = 1;
+ me->events = events;
pthread_mutex_unlock(&mutex);
- sig_monitor(job->timeout, job_call, job);
+ sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
- job->blocked = 0;
- if (job->dropped)
- events_release(job);
+ events->runs = 0;
+ me->events = NULL;
} else {
/* no job and not events */
waiting++;
@@ -813,76 +821,6 @@ void jobs_terminate()
}
/**
- * Adds the events waiter/dispatcher to the list of events waiters/dispatchers
- * to monitor.
- * @param key A key to register the events waiter/dispatcher (see
- * 'jobs_del_events')
- * @param timeout Timeout in second of the function or 0 if none
- * @param events The callback, the first argument is 0 for normal
- * flow or the signal number when normal flow failed
- * @param closure The closure to give to the callback as secondd argument
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_add_events(void *key, int timeout, void (*events)(int signum, void*), void *closure)
-{
- struct job *job;
-
- pthread_mutex_lock(&mutex);
-
- /* look at an already existsing events for same key */
- job = events_of_key(key);
- if (job) {
- pthread_mutex_unlock(&mutex);
- ERROR("events of key %p already exist", key);
- errno = EEXIST;
- return -1;
- }
-
- /* creates the job */
- job = job_create(key, timeout, (job_cb_t)events, closure, NULL, NULL);
- if (!job) {
- pthread_mutex_unlock(&mutex);
- ERROR("Can't create events, out of memory");
- errno = ENOMEM;
- return -1;
- }
-
- /* adds the loop */
- job->next = first_events;
- first_events = job;
-
- /* signal the loop */
- if (waiting)
- pthread_cond_signal(&cond);
- pthread_mutex_unlock(&mutex);
- return 0;
-}
-
-/**
- * Removes the events of 'key'
- * @param key The key of the events to remove
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_del_events(void *key)
-{
- struct job *job;
-
- pthread_mutex_lock(&mutex);
- job = events_of_key(key);
- if (job)
- if (job->blocked)
- job->dropped = 1;
- else
- events_release(job);
- pthread_mutex_unlock(&mutex);
- if (!job) {
- ERROR("events of key %p not found", key);
- errno = ENOENT;
- }
- return -!job;
-}
-
-/**
* Adds the current thread to the pool of threads
* processing the jobs. Returns normally when the threads are
* terminated or immediately with an error if the thread is
@@ -910,3 +848,66 @@ int jobs_add_me()
}
+struct sd_event *jobs_get_sd_event()
+{
+ struct events *events;
+ struct thread *me;
+ int rc;
+
+ pthread_mutex_lock(&mutex);
+
+ /* search events on stack */
+ me = current;
+ while (me && !me->events)
+ me = me->upper;
+ if (me)
+ /* return the stacked events */
+ events = me->events;
+ else {
+ /* search an available events */
+ events = events_get();
+ if (!events) {
+ /* not found, check if creation possible */
+ if (nevents >= allowed) {
+ ERROR("not possible to add a new event");
+ events = NULL;
+ } else {
+ events = malloc(sizeof *events);
+ if (events && (rc = sd_event_new(&events->event)) >= 0) {
+ if (nevents < started || start_one_thread() >= 0) {
+ events->runs = 0;
+ events->next = first_events;
+ first_events = events;
+ } else {
+ ERROR("can't start thread for events");
+ sd_event_unref(events->event);
+ free(events);
+ events = NULL;
+ }
+ } else {
+ if (!events)
+ ERROR("out of memory");
+ else {
+ free(events);
+ ERROR("creation of sd_event failed: %m");
+ events = NULL;
+ errno = -rc;
+ }
+ }
+ }
+ }
+ if (events) {
+ /* */
+ me = current;
+ if (me) {
+ events->runs = 1;
+ me->events = events;
+ } else {
+ WARNING("event returned for unknown thread!");
+ }
+ }
+ }
+ pthread_mutex_unlock(&mutex);
+ return events ? events->event : NULL;
+}
+
diff --git a/src/jobs.h b/src/jobs.h
index 3c0746ce..0eceef3d 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -17,6 +17,8 @@
#pragma once
+struct sd_event;
+
extern int jobs_queue0(
void *group,
int timeout,
@@ -65,8 +67,7 @@ extern int jobs_invoke3(
void *arg2,
void *arg3);
-extern int jobs_add_events(void *key, int timeout, void (*events)(int, void*), void *closure);
-extern int jobs_del_events(void *key);
+extern struct sd_event *jobs_get_sd_event();
extern int jobs_init(int allowed_count, int start_count, int waiter_count);
extern int jobs_add_me();
diff --git a/src/main.c b/src/main.c
index 3809400f..9c2f3c56 100644
--- a/src/main.c
+++ b/src/main.c
@@ -394,17 +394,6 @@ static int execute_command()
}
/*---------------------------------------------------------
- | main event processing
- +--------------------------------------------------------- */
-
-static void main_event_wait_and_dispatch(int signum, void *closure)
-{
- struct sd_event *event = closure;
- if (signum == 0)
- sd_event_run(event, 30000000);
-}
-
-/*---------------------------------------------------------
| job for starting the daemon
+--------------------------------------------------------- */
@@ -518,15 +507,9 @@ int main(int argc, char *argv[])
return 1;
}
- /* records the loop */
- if (jobs_add_events(NULL, 0, main_event_wait_and_dispatch, afb_common_get_event_loop()) < 0) {
- ERROR("failed to set main_event_wait_and_dispatch");
- return 1;
- }
-
/* queue the start job */
if (jobs_queue0(NULL, 0, start) < 0) {
- ERROR("failed to set main_event_wait_and_dispatch");
+ ERROR("failed to start runnning jobs");
return 1;
}