summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-01-06 10:34:12 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2019-02-05 09:58:02 +0100
commitfec8057d9779ea8e45682aa2a97b88d96eccf03c (patch)
tree3c393639779a2286b9aadc200baa66ae125bbf30
parentec04e7de2d8068c804acff096e88e81b9cde70e7 (diff)
jobs: Refactor event loop handling
This improves the arbitration of the single event loop across threads. Before introduction of using 'evenfd' there was several event loop. At the current time, there is only one. At the end, there will probably remain only one. Bug-AGL: SPEC-2089 Change-Id: I1b40a11eee495cccfd3ed7a25ae6889042abd6bb Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c302
-rw-r--r--src/sig-monitor.c5
-rw-r--r--src/sig-monitor.h2
3 files changed, 184 insertions, 125 deletions
diff --git a/src/jobs.c b/src/jobs.c
index f2c9d52b..417f7eac 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -75,20 +75,20 @@ struct evloop
unsigned state; /**< encoded state */
int efd; /**< event notification */
struct sd_event *sdev; /**< the systemd event loop */
- pthread_cond_t cond; /**< condition */
struct fdev *fdev; /**< handling of events */
struct thread *holder; /**< holder of the evloop */
};
#define EVLOOP_STATE_WAIT 1U
#define EVLOOP_STATE_RUN 2U
-#define EVLOOP_STATE_LOCK 4U
/** Description of threads */
struct thread
{
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
+ struct thread *nholder;/**< next holder for evloop */
+ pthread_cond_t *cwhold;/**< condition wait for holding */
struct job *job; /**< currently processed job */
pthread_t tid; /**< the thread id */
volatile unsigned stop: 1; /**< stop requested */
@@ -96,7 +96,7 @@ struct thread
};
/**
- * Description of synchonous callback
+ * Description of synchronous callback
*/
struct sync
{
@@ -123,14 +123,13 @@ static int remains = 0; /** allowed count of waiting jobs */
/* list of threads */
static struct thread *threads;
static _Thread_local struct thread *current_thread;
-static _Thread_local struct evloop *current_evloop;
/* queue of pending jobs */
static struct job *first_job;
static struct job *free_jobs;
/* event loop */
-static struct evloop evloop[1];
+static struct evloop evloop;
#if defined(REMOVE_SYSTEMD_EVENT)
static struct fdev_epoll *fdevepoll;
@@ -300,13 +299,9 @@ static void evloop_run(int signum, void *arg)
{
int rc;
struct sd_event *se;
- struct evloop *el = arg;
if (!signum) {
- current_evloop = el;
- __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
- se = el->sdev;
+ se = evloop.sdev;
rc = sd_event_prepare(se);
if (rc < 0) {
errno = -rc;
@@ -320,8 +315,7 @@ static void evloop_run(int signum, void *arg)
ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
}
}
- __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
-
+ evloop.state = EVLOOP_STATE_RUN;
if (rc > 0) {
rc = sd_event_dispatch(se);
if (rc < 0) {
@@ -331,9 +325,93 @@ static void evloop_run(int signum, void *arg)
}
}
}
- __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
}
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any.
+ */
+static void evloop_on_efd_event()
+{
+ uint64_t x;
+ read(evloop.efd, &x, sizeof x);
+}
+
+/**
+ * wakeup the event loop if needed by sending
+ * an event.
+ */
+static void evloop_wakeup()
+{
+ uint64_t x;
+
+ if (evloop.state & EVLOOP_STATE_WAIT) {
+ x = 1;
+ write(evloop.efd, &x, sizeof x);
+ }
+}
+
+/**
+ * Release the currently held event loop
+ */
+static void evloop_release()
+{
+ struct thread *nh, *ct = current_thread;
+
+ if (evloop.holder == ct) {
+ nh = ct->nholder;
+ evloop.holder = nh;
+ if (nh)
+ pthread_cond_signal(nh->cwhold);
+ }
+}
+
+/**
+ * get the eventloop for the current thread
+ */
+static int evloop_get()
+{
+ struct thread *ct = current_thread;
+
+ if (evloop.holder)
+ return evloop.holder == ct;
+
+ ct->nholder = NULL;
+ evloop.holder = ct;
+ return 1;
+}
+
+/**
+ * acquire the eventloop for the current thread
+ */
+static void evloop_acquire()
+{
+ struct thread **pwait, *ct;
+ pthread_cond_t cond;
+
+ /* try to get the evloop */
+ if (!evloop_get()) {
+ /* failed, init waiting state */
+ ct = current_thread;
+ ct->nholder = NULL;
+ ct->cwhold = &cond;
+ pthread_cond_init(&cond, NULL);
+
+ /* queue current thread in holder list */
+ pwait = &evloop.holder;
+ while (*pwait)
+ pwait = &(*pwait)->nholder;
+ *pwait = ct;
+
+ /* wake up the evloop */
+ evloop_wakeup();
+
+ /* wait to acquire the evloop */
+ pthread_cond_wait(&cond, &mutex);
+ pthread_cond_destroy(&cond);
+ }
+}
#if defined(REMOVE_SYSTEMD_EVENT)
/**
@@ -363,9 +441,6 @@ static void thread_run(volatile struct thread *me)
{
struct thread **prv;
struct job *job;
-#if !defined(REMOVE_SYSTEMD_EVENT)
- struct evloop *el;
-#endif
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
@@ -382,12 +457,8 @@ static void thread_run(volatile struct thread *me)
/* loop until stopped */
while (!me->stop) {
- /* release the event loop */
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
- current_evloop = NULL;
- }
+ /* release the current event loop */
+ evloop_release();
/* get a job */
job = job_get();
@@ -405,27 +476,31 @@ static void thread_run(volatile struct thread *me)
/* release the run job */
job_release(job);
#if !defined(REMOVE_SYSTEMD_EVENT)
- } else {
- /* no job, check events */
- el = &evloop[0];
- if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
- /* run the events */
- __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, me, __ATOMIC_RELAXED);
- current_evloop = el;
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, evloop_run, el);
- pthread_mutex_lock(&mutex);
- } else {
- /* no job and not events */
- running--;
- if (!running)
- ERROR("Entering job deep sleep! Check your bindings.");
- me->waits = 1;
- pthread_cond_wait(&cond, &mutex);
- me->waits = 0;
- running++;
+
+
+
+ /* no job, check event loop wait */
+ } else if (evloop_get()) {
+ if (evloop.state != 0) {
+ /* busy ? */
+ CRITICAL("Can't enter dispatch while in dispatch!");
+ abort();
}
+ /* run the events */
+ evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(0, evloop_run, NULL);
+ pthread_mutex_lock(&mutex);
+ evloop.state = 0;
+ } else {
+ /* no job and no event loop */
+ running--;
+ if (!running)
+ ERROR("Entering job deep sleep! Check your bindings.");
+ me->waits = 1;
+ pthread_cond_wait(&cond, &mutex);
+ me->waits = 0;
+ running++;
#else
} else if (waitevt) {
/* no job and not events */
@@ -448,11 +523,7 @@ static void thread_run(volatile struct thread *me)
}
/* release the event loop */
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, NULL, __ATOMIC_RELAXED);
- current_evloop = NULL;
- }
+ evloop_release();
/* unlink the current thread and cleanup */
prv = &threads;
@@ -659,41 +730,6 @@ int jobs_enter(
}
/**
- * Internal callback for evloop management.
- * The effect of this function is hidden: it exits
- * the waiting poll if any. Then it wakes up a thread
- * awaiting the evloop using signal.
- */
-static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
-{
- uint64_t x;
- struct evloop *evloop = userdata;
- read(evloop->efd, &x, sizeof x);
- pthread_mutex_lock(&mutex);
- pthread_cond_broadcast(&evloop->cond);
- pthread_mutex_unlock(&mutex);
- return 1;
-}
-
-/**
- * unlock the event loop if needed by sending
- * an event.
- * @param el the event loop to unlock
- * @param wait wait the unlocked state of the event loop
- */
-static void unlock_evloop(struct evloop *el, int wait)
-{
- /* wait for a modifiable event loop */
- while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
- uint64_t x = 1;
- write(el->efd, &x, sizeof x);
- if (!wait)
- break;
- pthread_cond_wait(&el->cond, &mutex);
- }
-}
-
-/**
* Unlocks the execution flow designed by 'jobloop'.
* @param jobloop indication of the flow to unlock
* @return 0 in case of success of -1 on error
@@ -701,7 +737,6 @@ static void unlock_evloop(struct evloop *el, int wait)
int jobs_leave(struct jobloop *jobloop)
{
struct thread *t;
- int i;
pthread_mutex_lock(&mutex);
t = threads;
@@ -713,15 +748,8 @@ int jobs_leave(struct jobloop *jobloop)
t->stop = 1;
if (t->waits)
pthread_cond_broadcast(&cond);
- else {
- i = (int)(sizeof evloop / sizeof *evloop);
- while(i) {
- if (evloop[--i].holder == t) {
- unlock_evloop(&evloop[i], 0);
- break;
- }
- }
- }
+ else
+ evloop_wakeup();
}
pthread_mutex_unlock(&mutex);
return -!t;
@@ -755,6 +783,18 @@ int jobs_call(
return do_sync(group, timeout, call_cb, &sync);
}
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any. Then it wakes up a thread
+ * awaiting the evloop using signal.
+ */
+static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+ evloop_on_efd_event();
+ return 1;
+}
+
/* temporary hack */
#if !defined(REMOVE_SYSTEMD_EVENT)
__attribute__((unused))
@@ -770,35 +810,33 @@ static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
*/
static struct sd_event *get_sd_event_locked()
{
- struct evloop *el;
int rc;
/* creates the evloop on need */
- el = &evloop[0];
- if (!el->sdev) {
+ if (!evloop.sdev) {
/* start the creation */
- el->state = 0;
+ evloop.state = 0;
/* creates the eventfd for waking up polls */
- el->efd = eventfd(0, EFD_CLOEXEC);
- if (el->efd < 0) {
+ evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
+ if (evloop.efd < 0) {
ERROR("can't make eventfd for events");
goto error1;
}
/* create the systemd event loop */
- rc = sd_event_new(&el->sdev);
+ rc = sd_event_new(&evloop.sdev);
if (rc < 0) {
ERROR("can't make new event loop");
goto error2;
}
/* put the eventfd in the event loop */
- rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
+ rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
if (rc < 0) {
ERROR("can't register eventfd");
#if !defined(REMOVE_SYSTEMD_EVENT)
- sd_event_unref(el->sdev);
- el->sdev = NULL;
+ sd_event_unref(evloop.sdev);
+ evloop.sdev = NULL;
error2:
- close(el->efd);
+ close(evloop.efd);
error1:
return NULL;
}
@@ -806,38 +844,27 @@ error1:
goto error3;
}
/* handle the event loop */
- el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
- if (!el->fdev) {
+ evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
+ if (!evloop.fdev) {
ERROR("can't create fdev");
error3:
- sd_event_unref(el->sdev);
+ sd_event_unref(evloop.sdev);
error2:
- close(el->efd);
+ close(evloop.efd);
error1:
- memset(el, 0, sizeof *el);
+ memset(&evloop, 0, sizeof evloop);
return NULL;
}
- fdev_set_autoclose(el->fdev, 0);
- fdev_set_events(el->fdev, EPOLLIN);
- fdev_set_callback(el->fdev, evloop_callback, el);
+ fdev_set_autoclose(evloop.fdev, 0);
+ fdev_set_events(evloop.fdev, EPOLLIN);
+ fdev_set_callback(evloop.fdev, evloop_callback, NULL);
#endif
}
- /* attach the event loop to the current thread */
- if (current_evloop != el) {
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
- }
- current_evloop = el;
- __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
- }
-
- /* wait for a modifiable event loop */
- unlock_evloop(el, 1);
+ /* acquire the event loop */
+ evloop_acquire();
- return el->sdev;
+ return evloop.sdev;
}
/**
@@ -847,11 +874,36 @@ error1:
struct sd_event *jobs_get_sd_event()
{
struct sd_event *result;
+ struct thread lt;
+
+ /* ensure an existing thread environment */
+ if (!current_thread) {
+ memset(&lt, 0, sizeof lt);
+ current_thread = &lt;
+ }
+ /* process */
pthread_mutex_lock(&mutex);
result = get_sd_event_locked();
pthread_mutex_unlock(&mutex);
+ /* release the faked thread environment if needed */
+ if (current_thread == &lt) {
+ /*
+ * Releasing it is needed because there is no way to guess
+ * when it has to be released really. But here is where it is
+ * hazardous: if the caller modifies the eventloop when it
+ * is waiting, there is no way to make the change effective.
+ * A workaround to achieve that goal is for the caller to
+ * require the event loop a second time after having modified it.
+ */
+ NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
+ if (verbose_wants(Log_Level_Info))
+ sig_monitor_dumpstack();
+ evloop_release();
+ current_thread = NULL;
+ }
+
return result;
}
diff --git a/src/sig-monitor.c b/src/sig-monitor.c
index 15fe260d..9e13fa13 100644
--- a/src/sig-monitor.c
+++ b/src/sig-monitor.c
@@ -299,3 +299,8 @@ void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg)
else
function(0, arg);
}
+
+void sig_monitor_dumpstack()
+{
+ return dumpstack(1, 0);
+}
diff --git a/src/sig-monitor.h b/src/sig-monitor.h
index 5fdac167..03c8ec49 100644
--- a/src/sig-monitor.h
+++ b/src/sig-monitor.h
@@ -25,3 +25,5 @@ extern int sig_monitor_enable();
extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg);
+extern void sig_monitor_dumpstack();
+