summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-11-03 13:04:30 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2017-11-03 13:04:30 +0100
commit674b01ab9887d7c16b674e32365c7ab1386ecc77 (patch)
tree883628fd82af4193e1e42453060d0ecb1019aa80
parente47e5c8339c3496015f667b7f71a6d673807141d (diff)
jobs: Improve event loop integration
The previous implmentation was buggy. This changes make the event loop a thread global variable. A thread now refuses to run an event loop if it is in dispatching state. Change-Id: Ic29792b87c1cae201958feb96d93678f6d37ac8d Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c122
1 files changed, 64 insertions, 58 deletions
diff --git a/src/jobs.c b/src/jobs.c
index 93e864f2..b7d16112 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -64,8 +64,11 @@ struct events
struct events *next;
struct sd_event *event;
uint64_t timeout;
- unsigned used: 1;
- unsigned runs: 1;
+ enum {
+ Available,
+ Modifiable,
+ Locked
+ } state;
};
/** Description of threads */
@@ -74,10 +77,8 @@ struct thread
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
struct job *job; /**< currently processed job */
- struct events *events; /**< currently processed job */
pthread_t tid; /**< the thread id */
unsigned stop: 1; /**< stop requested */
- unsigned lowered: 1; /**< has a lower same thread */
unsigned waits: 1; /**< is waiting? */
};
@@ -109,7 +110,8 @@ static int nevents = 0; /** count of events */
/* list of threads */
static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
/* queue of pending jobs */
static struct job *first_job;
@@ -204,7 +206,7 @@ static inline struct job *job_get()
static inline struct events *events_get()
{
struct events *events = first_events;
- while (events && events->used)
+ while (events && events->state != Available)
events = events->next;
return events;
}
@@ -272,9 +274,34 @@ static void job_cancel(int signum, void *arg)
*/
static void events_call(int signum, void *arg)
{
+ int rc;
+ struct sd_event *se;
struct events *events = arg;
- if (!signum)
- sd_event_run(events->event, events->timeout);
+
+ if (!signum) {
+ se = events->event;
+ rc = sd_event_prepare(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+ } else {
+ if (rc == 0) {
+ rc = sd_event_wait(se, events->timeout);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+
+ if (rc > 0) {
+ rc = sd_event_dispatch(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+ }
+ }
}
/**
@@ -286,7 +313,7 @@ static void events_call(int signum, void *arg)
*/
static void thread_run(volatile struct thread *me)
{
- struct thread **prv, *thr;
+ struct thread **prv;
struct job *job;
struct events *events;
uint64_t evto;
@@ -294,22 +321,18 @@ static void thread_run(volatile struct thread *me)
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
me->stop = 0;
- me->lowered = 0;
me->waits = 0;
- me->upper = current;
- if (current) {
- current->lowered = 1;
+ me->upper = current_thread;
+ if (current_thread) {
evto = EVENT_TIMEOUT_CHILD;
- me->events = current->events;
} else {
started++;
sig_monitor_init_timeouts();
evto = EVENT_TIMEOUT_TOP;
- me->events = NULL;
}
me->next = threads;
threads = (struct thread*)me;
- current = (struct thread*)me;
+ current_thread = (struct thread*)me;
/* loop until stopped */
while (!me->stop) {
@@ -326,37 +349,32 @@ static void thread_run(volatile struct thread *me)
sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
- /* release the run job */
- job_release(job);
-
/* release event if any */
- events = me->events;
- if (events) {
- events->used = 0;
- me->events = NULL;
+ events = current_events;
+ if (events && events->state == Modifiable) {
+ current_events = NULL;
+ events->state = Available;
}
+
+ /* release the run job */
+ job_release(job);
} else {
/* no job, check events */
- events = me->events;
- if (!events || events->runs)
+ events = current_events;
+ if (!events)
events = events_get();
+ else if (events->state == Locked)
+ events = 0;
if (events) {
/* run the events */
- events->used = 1;
- events->runs = 1;
+ events->state = Locked;
events->timeout = evto;
- me->events = events;
+ current_events = events;
pthread_mutex_unlock(&mutex);
sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
- events->used = 0;
- events->runs = 0;
- me->events = NULL;
- thr = me->upper;
- while (thr && thr->events == events) {
- thr->events = NULL;
- thr = thr->upper;
- }
+ current_events = NULL;
+ events->state = Available;
} else {
/* no job and not events */
waiting++;
@@ -373,10 +391,8 @@ static void thread_run(volatile struct thread *me)
while (*prv != me)
prv = &(*prv)->next;
*prv = me->next;
- current = me->upper;
- if (current) {
- current->lowered = 0;
- } else {
+ current_thread = me->upper;
+ if (!current_thread) {
sig_monitor_clean_timeouts();
started--;
}
@@ -631,19 +647,13 @@ int jobs_call(
struct sd_event *jobs_get_sd_event()
{
struct events *events;
- struct thread *me;
int rc;
pthread_mutex_lock(&mutex);
/* search events on stack */
- me = current;
- while (me && !me->events)
- me = me->upper;
- if (me)
- /* return the stacked events */
- events = me->events;
- else {
+ events = current_events;
+ if (!events) {
/* search an available events */
events = events_get();
if (!events) {
@@ -655,8 +665,7 @@ struct sd_event *jobs_get_sd_event()
events = malloc(sizeof *events);
if (events && (rc = sd_event_new(&events->event)) >= 0) {
if (nevents < started || start_one_thread() >= 0) {
- events->used = 0;
- events->runs = 0;
+ events->state = Available;
events->next = first_events;
first_events = events;
} else {
@@ -679,13 +688,10 @@ struct sd_event *jobs_get_sd_event()
}
}
if (events) {
- me = current;
- if (me) {
- events->used = 1;
- me->events = events;
- } else {
+ events->state = Modifiable;
+ if (!current_thread)
WARNING("event returned for unknown thread!");
- }
+ current_events = events;
}
}
pthread_mutex_unlock(&mutex);
@@ -715,7 +721,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
pthread_mutex_lock(&mutex);
/* check whether already running */
- if (current || allowed) {
+ if (current_thread || allowed) {
ERROR("thread already started");
errno = EINVAL;
goto error;
@@ -822,7 +828,7 @@ void jobs_terminate()
head = job->next;
/* search if job is stacked for current */
- t = current;
+ t = current_thread;
while (t && t->job != job)
t = t->upper;
if (t) {