aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-01-06 10:34:12 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2019-01-16 22:32:25 +0100
commite9c6530ee21bb3345741044e8bf87fc6a73252e9 (patch)
treeb0f5e5991c0a9808db885faa9fc8768ee9ab22ce
parentbb5dd7af3cb715e707f367ee7fd462de12c84f8e (diff)
jobs: Refactor event loop handling
This improves the arbitration of the single event loop across threads. Before introduction of using 'evenfd' there was several event loop. At the current time, there is only one. At the end, there will probably remain only one. Bug-AGL: SPEC-2089 Change-Id: Iac9db7cbe15b4c9c76e6e9a8f6e641ed2a9039e0 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c302
-rw-r--r--src/sig-monitor.c5
-rw-r--r--src/sig-monitor.h2
3 files changed, 184 insertions, 125 deletions
diff --git a/src/jobs.c b/src/jobs.c
index f2c9d52b..417f7eac 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -75,20 +75,20 @@ struct evloop
unsigned state; /**< encoded state */
int efd; /**< event notification */
struct sd_event *sdev; /**< the systemd event loop */
- pthread_cond_t cond; /**< condition */
struct fdev *fdev; /**< handling of events */
struct thread *holder; /**< holder of the evloop */
};
#define EVLOOP_STATE_WAIT 1U
#define EVLOOP_STATE_RUN 2U
-#define EVLOOP_STATE_LOCK 4U
/** Description of threads */
struct thread
{
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
+ struct thread *nholder;/**< next holder for evloop */
+ pthread_cond_t *cwhold;/**< condition wait for holding */
struct job *job; /**< currently processed job */
pthread_t tid; /**< the thread id */
volatile unsigned stop: 1; /**< stop requested */
@@ -96,7 +96,7 @@ struct thread
};
/**
- * Description of synchonous callback
+ * Description of synchronous callback
*/
struct sync
{
@@ -123,14 +123,13 @@ static int remains = 0; /** allowed count of waiting jobs */
/* list of threads */
static struct thread *threads;
static _Thread_local struct thread *current_thread;
-static _Thread_local struct evloop *current_evloop;
/* queue of pending jobs */
static struct job *first_job;
static struct job *free_jobs;
/* event loop */
-static struct evloop evloop[1];
+static struct evloop evloop;
#if defined(REMOVE_SYSTEMD_EVENT)
static struct fdev_epoll *fdevepoll;
@@ -300,13 +299,9 @@ static void evloop_run(int signum, void *arg)
{
int rc;
struct sd_event *se;
- struct evloop *el = arg;
if (!signum) {
- current_evloop = el;
- __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
- se = el->sdev;
+ se = evloop.sdev;
rc = sd_event_prepare(se);
if (rc < 0) {
errno = -rc;
@@ -320,8 +315,7 @@ static void evloop_run(int signum, void *arg)
ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
}
}
- __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
-
+ evloop.state = EVLOOP_STATE_RUN;
if (rc > 0) {
rc = sd_event_dispatch(se);
if (rc < 0) {
@@ -331,9 +325,93 @@ static void evloop_run(int signum, void *arg)
}
}
}
- __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
}
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any.
+ */
+static void evloop_on_efd_event()
+{
+ uint64_t x;
+ read(evloop.efd, &x, sizeof x);
+}
+
+/**
+ * wakeup the event loop if needed by sending
+ * an event.
+ */
+static void evloop_wakeup()
+{
+ uint64_t x;
+
+ if (evloop.state & EVLOOP_STATE_WAIT) {
+ x = 1;
+ write(evloop.efd, &x, sizeof x);
+ }
+}
+
+/**
+ * Release the currently held event loop
+ */
+static void evloop_release()
+{
+ struct thread *nh, *ct = current_thread;
+
+ if (evloop.holder == ct) {
+ nh = ct->nholder;
+ evloop.holder = nh;
+ if (nh)
+ pthread_cond_signal(nh->cwhold);
+ }
+}
+
+/**
+ * get the eventloop for the current thread
+ */
+static int evloop_get()
+{
+ struct thread *ct = current_thread;
+
+ if (evloop.holder)
+ return evloop.holder == ct;
+
+ ct->nholder = NULL;
+ evloop.holder = ct;
+ return 1;
+}
+
+/**
+ * acquire the eventloop for the current thread
+ */
+static void evloop_acquire()
+{
+ struct thread **pwait, *ct;
+ pthread_cond_t cond;
+
+ /* try to get the evloop */
+ if (!evloop_get()) {
+ /* failed, init waiting state */
+ ct = current_thread;
+ ct->nholder = NULL;
+ ct->cwhold = &cond;
+ pthread_cond_init(&cond, NULL);
+
+ /* queue current thread in holder list */
+ pwait = &evloop.holder;
+ while (*pwait)
+ pwait = &(*pwait)->nholder;
+ *pwait = ct;
+
+ /* wake up the evloop */
+ evloop_wakeup();
+
+ /* wait to acquire the evloop */
+ pthread_cond_wait(&cond, &mutex);
+ pthread_cond_destroy(&cond);
+ }
+}
#if defined(REMOVE_SYSTEMD_EVENT)
/**
@@ -363,9 +441,6 @@ static void thread_run(volatile struct thread *me)
{
struct thread **prv;
struct job *job;
-#if !defined(REMOVE_SYSTEMD_EVENT)
- struct evloop *el;
-#endif
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
@@ -382,12 +457,8 @@ static void thread_run(volatile struct thread *me)
/* loop until stopped */
while (!me->stop) {
- /* release the event loop */
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
- current_evloop = NULL;
- }
+ /* release the current event loop */
+ evloop_release();
/* get a job */
job = job_get();
@@ -405,27 +476,31 @@ static void thread_run(volatile struct thread *me)
/* release the run job */
job_release(job);
#if !defined(REMOVE_SYSTEMD_EVENT)
- } else {
- /* no job, check events */
- el = &evloop[0];
- if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
- /* run the events */
- __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, me, __ATOMIC_RELAXED);
- current_evloop = el;
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, evloop_run, el);
- pthread_mutex_lock(&mutex);
- } else {
- /* no job and not events */
- running--;
- if (!running)
- ERROR("Entering job deep sleep! Check your bindings.");
- me->waits = 1;
- pthread_cond_wait(&cond, &mutex);
- me->waits = 0;
- running++;
+
+
+
+ /* no job, check event loop wait */
+ } else if (evloop_get()) {
+ if (evloop.state != 0) {
+ /* busy ? */
+ CRITICAL("Can't enter dispatch while in dispatch!");
+ abort();
}
+ /* run the events */
+ evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(0, evloop_run, NULL);
+ pthread_mutex_lock(&mutex);
+ evloop.state = 0;
+ } else {
+ /* no job and no event loop */
+ running--;
+ if (!running)
+ ERROR("Entering job deep sleep! Check your bindings.");
+ me->waits = 1;
+ pthread_cond_wait(&cond, &mutex);
+ me->waits = 0;
+ running++;
#else
} else if (waitevt) {
/* no job and not events */
@@ -448,11 +523,7 @@ static void thread_run(volatile struct thread *me)
}
/* release the event loop */
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, NULL, __ATOMIC_RELAXED);
- current_evloop = NULL;
- }
+ evloop_release();
/* unlink the current thread and cleanup */
prv = &threads;
@@ -659,41 +730,6 @@ int jobs_enter(
}
/**
- * Internal callback for evloop management.
- * The effect of this function is hidden: it exits
- * the waiting poll if any. Then it wakes up a thread
- * awaiting the evloop using signal.
- */
-static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
-{
- uint64_t x;
- struct evloop *evloop = userdata;
- read(evloop->efd, &x, sizeof x);
- pthread_mutex_lock(&mutex);
- pthread_cond_broadcast(&evloop->cond);
- pthread_mutex_unlock(&mutex);
- return 1;
-}
-
-/**
- * unlock the event loop if needed by sending
- * an event.
- * @param el the event loop to unlock
- * @param wait wait the unlocked state of the event loop
- */
-static void unlock_evloop(struct evloop *el, int wait)
-{
- /* wait for a modifiable event loop */
- while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
- uint64_t x = 1;
- write(el->efd, &x, sizeof x);
- if (!wait)
- break;
- pthread_cond_wait(&el->cond, &mutex);
- }
-}
-
-/**
* Unlocks the execution flow designed by 'jobloop'.
* @param jobloop indication of the flow to unlock
* @return 0 in case of success of -1 on error
@@ -701,7 +737,6 @@ static void unlock_evloop(struct evloop *el, int wait)
int jobs_leave(struct jobloop *jobloop)
{
struct thread *t;
- int i;
pthread_mutex_lock(&mutex);
t = threads;
@@ -713,15 +748,8 @@ int jobs_leave(struct jobloop *jobloop)
t->stop = 1;
if (t->waits)
pthread_cond_broadcast(&cond);
- else {
- i = (int)(sizeof evloop / sizeof *evloop);
- while(i) {
- if (evloop[--i].holder == t) {
- unlock_evloop(&evloop[i], 0);
- break;
- }
- }
- }
+ else
+ evloop_wakeup();
}
pthread_mutex_unlock(&mutex);
return -!t;
@@ -755,6 +783,18 @@ int jobs_call(
return do_sync(group, timeout, call_cb, &sync);
}
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any. Then it wakes up a thread
+ * awaiting the evloop using signal.
+ */
+static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+ evloop_on_efd_event();
+ return 1;
+}
+
/* temporary hack */
#if !defined(REMOVE_SYSTEMD_EVENT)
__attribute__((unused))
@@ -770,35 +810,33 @@ static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
*/
static struct sd_event *get_sd_event_locked()
{
- struct evloop *el;
int rc;
/* creates the evloop on need */
- el = &evloop[0];
- if (!el->sdev) {
+ if (!evloop.sdev) {
/* start the creation */
- el->state = 0;
+ evloop.state = 0;
/* creates the eventfd for waking up polls */
- el->efd = eventfd(0, EFD_CLOEXEC);
- if (el->efd < 0) {
+ evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
+ if (evloop.efd < 0) {
ERROR("can't make eventfd for events");
goto error1;
}
/* create the systemd event loop */
- rc = sd_event_new(&el->sdev);
+ rc = sd_event_new(&evloop.sdev);
if (rc < 0) {
ERROR("can't make new event loop");
goto error2;
}
/* put the eventfd in the event loop */
- rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
+ rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
if (rc < 0) {
ERROR("can't register eventfd");
#if !defined(REMOVE_SYSTEMD_EVENT)
- sd_event_unref(el->sdev);
- el->sdev = NULL;
+ sd_event_unref(evloop.sdev);
+ evloop.sdev = NULL;
error2:
- close(el->efd);
+ close(evloop.efd);
error1:
return NULL;
}
@@ -806,38 +844,27 @@ error1:
goto error3;
}
/* handle the event loop */
- el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
- if (!el->fdev) {
+ evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
+ if (!evloop.fdev) {
ERROR("can't create fdev");
error3:
- sd_event_unref(el->sdev);
+ sd_event_unref(evloop.sdev);
error2:
- close(el->efd);
+ close(evloop.efd);
error1:
- memset(el, 0, sizeof *el);
+ memset(&evloop, 0, sizeof evloop);
return NULL;
}
- fdev_set_autoclose(el->fdev, 0);
- fdev_set_events(el->fdev, EPOLLIN);
- fdev_set_callback(el->fdev, evloop_callback, el);
+ fdev_set_autoclose(evloop.fdev, 0);
+ fdev_set_events(evloop.fdev, EPOLLIN);
+ fdev_set_callback(evloop.fdev, evloop_callback, NULL);
#endif
}
- /* attach the event loop to the current thread */
- if (current_evloop != el) {
- if (current_evloop) {
- __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
- }
- current_evloop = el;
- __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
- __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
- }
-
- /* wait for a modifiable event loop */
- unlock_evloop(el, 1);
+ /* acquire the event loop */
+ evloop_acquire();
- return el->sdev;
+ return evloop.sdev;
}
/**
@@ -847,11 +874,36 @@ error1:
struct sd_event *jobs_get_sd_event()
{
struct sd_event *result;
+ struct thread lt;
+
+ /* ensure an existing thread environment */
+ if (!current_thread) {
+ memset(&lt, 0, sizeof lt);
+ current_thread = &lt;
+ }
+ /* process */
pthread_mutex_lock(&mutex);
result = get_sd_event_locked();
pthread_mutex_unlock(&mutex);
+ /* release the faked thread environment if needed */
+ if (current_thread == &lt) {
+ /*
+ * Releasing it is needed because there is no way to guess
+ * when it has to be released really. But here is where it is
+ * hazardous: if the caller modifies the eventloop when it
+ * is waiting, there is no way to make the change effective.
+ * A workaround to achieve that goal is for the caller to
+ * require the event loop a second time after having modified it.
+ */
+ NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
+ if (verbose_wants(Log_Level_Info))
+ sig_monitor_dumpstack();
+ evloop_release();
+ current_thread = NULL;
+ }
+
return result;
}
diff --git a/src/sig-monitor.c b/src/sig-monitor.c
index 15fe260d..9e13fa13 100644
--- a/src/sig-monitor.c
+++ b/src/sig-monitor.c
@@ -299,3 +299,8 @@ void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg)
else
function(0, arg);
}
+
+void sig_monitor_dumpstack()
+{
+ return dumpstack(1, 0);
+}
diff --git a/src/sig-monitor.h b/src/sig-monitor.h
index 5fdac167..03c8ec49 100644
--- a/src/sig-monitor.h
+++ b/src/sig-monitor.h
@@ -25,3 +25,5 @@ extern int sig_monitor_enable();
extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg);
+extern void sig_monitor_dumpstack();
+