aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2018-03-23 18:58:37 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2018-06-13 17:14:13 +0200
commitcab05ba72abeb335b456b0820e053c94b91986e7 (patch)
tree7052b78f77089bb4a3a42e0f215cd444d14f72eb
parente7e42ce9196e865a2dfd5a460932b0357885e603 (diff)
jobs: Switch to fdev_epoll instead of sd_event
Use by default an epoll of jobs for handling HTTP and websockets. The sd_event loop remains available (to be checked). Change-Id: Iaaad005c9880ba1818936a93c8626490666c9eec Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-fdev.c27
-rw-r--r--src/jobs.c114
-rw-r--r--src/jobs.h2
3 files changed, 117 insertions, 26 deletions
diff --git a/src/afb-fdev.c b/src/afb-fdev.c
index 943c2542..1c36091e 100644
--- a/src/afb-fdev.c
+++ b/src/afb-fdev.c
@@ -15,8 +15,19 @@
* limitations under the License.
*/
-#include "afb-systemd.h"
#include "fdev.h"
+
+#if defined(WITH_SYSTEMD_EVENT)
+# define USE_SYSTEMD 1
+# define USE_EPOLL 0
+#else
+# define USE_SYSTEMD 0
+# define USE_EPOLL 1
+#endif
+
+#if USE_SYSTEMD
+
+#include "afb-systemd.h"
#include "fdev-systemd.h"
struct fdev *afb_fdev_create(int fd)
@@ -24,3 +35,17 @@ struct fdev *afb_fdev_create(int fd)
return fdev_systemd_create(afb_systemd_get_event_loop(), fd);
}
+#endif
+
+#if USE_EPOLL
+
+#include "jobs.h"
+#include "fdev-epoll.h"
+
+struct fdev *afb_fdev_create(int fd)
+{
+ return fdev_epoll_add(jobs_get_fdev_epoll(), fd);
+}
+
+#endif
+
diff --git a/src/jobs.c b/src/jobs.c
index 42f2fbe7..9c81700a 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -27,6 +27,7 @@
#include <stdint.h>
#include <unistd.h>
#include <signal.h>
+#include <string.h>
#include <time.h>
#include <sys/syscall.h>
#include <pthread.h>
@@ -35,11 +36,13 @@
#include <sys/eventfd.h>
#include <systemd/sd-event.h>
+#include "fdev.h"
#if HAS_WATCHDOG
#include <systemd/sd-daemon.h>
#endif
#include "jobs.h"
+#include "fdev-epoll.h"
#include "sig-monitor.h"
#include "verbose.h"
@@ -75,6 +78,7 @@ struct evloop
int efd; /**< event notification */
struct sd_event *sdev; /**< the systemd event loop */
pthread_cond_t cond; /**< condition */
+ struct fdev *fdev; /**< handling of events */
};
#define EVLOOP_STATE_WAIT 1U
@@ -128,6 +132,8 @@ static struct job *free_jobs;
/* event loop */
static struct evloop evloop[1];
+static struct fdev_epoll *fdevepoll;
+static int waitevt;
/**
* Create a new job with the given parameters
@@ -262,6 +268,21 @@ static void job_cancel(int signum, void *arg)
}
/**
+ * Gets a fdev_epoll item.
+ * @return a fdev_epoll or NULL in case of error
+ */
+static struct fdev_epoll *get_fdevepoll()
+{
+ struct fdev_epoll *result;
+
+ result = fdevepoll;
+ if (!result)
+ result = fdevepoll = fdev_epoll_create();
+
+ return result;
+}
+
+/**
* Monitored normal callback for events.
* This function is called by the monitor
* to run the event loop when the safe environment
@@ -278,6 +299,8 @@ static void evloop_run(int signum, void *arg)
struct evloop *el = arg;
if (!signum) {
+ current_evloop = el;
+ __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
se = el->sdev;
rc = sd_event_prepare(se);
if (rc < 0) {
@@ -307,6 +330,21 @@ static void evloop_run(int signum, void *arg)
/**
+ * Monitored normal loop for waiting events.
+ * @param signum 0 on normal flow or the number
+ * of the signal that interrupted the normal
+ * flow
+ * @param arg the events to run
+ */
+static void monitored_wait_and_dispatch(int signum, void *arg)
+{
+ struct fdev_epoll *fdev_epoll = arg;
+ if (!signum) {
+ fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
+ }
+}
+
+/**
* Main processing loop of threads processing jobs.
* The loop must be called with the mutex locked
* and it returns with the mutex locked.
@@ -317,7 +355,6 @@ static void thread_run(volatile struct thread *me)
{
struct thread **prv;
struct job *job;
- struct evloop *el;
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
@@ -336,7 +373,7 @@ static void thread_run(volatile struct thread *me)
while (!me->stop) {
/* release the event loop */
if (current_evloop) {
- __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
+ __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
current_evloop = NULL;
}
@@ -355,32 +392,28 @@ static void thread_run(volatile struct thread *me)
/* release the run job */
job_release(job);
+ } else if (waitevt) {
+ /* no job and not events */
+ running--;
+ if (!running)
+ ERROR("Entering job deep sleep! Check your bindings.");
+ me->waits = 1;
+ pthread_cond_wait(&cond, &mutex);
+ me->waits = 0;
+ running++;
} else {
- /* no job, check events */
- el = &evloop[0];
- if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
- /* run the events */
- __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
- current_evloop = el;
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, evloop_run, el);
- pthread_mutex_lock(&mutex);
- } else {
- /* no job and not events */
- running--;
- if (!running)
- ERROR("Entering job deep sleep! Check your bindings.");
- me->waits = 1;
- pthread_cond_wait(&cond, &mutex);
- me->waits = 0;
- running++;
- }
+ /* wait for events */
+ waitevt = 1;
+ pthread_mutex_unlock(&mutex);
+ sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
+ pthread_mutex_lock(&mutex);
+ waitevt = 0;
}
}
/* release the event loop */
if (current_evloop) {
- __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
+ __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
current_evloop = NULL;
}
@@ -657,6 +690,12 @@ static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *use
return 1;
}
+/* temporary hack */
+static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
+{
+ sig_monitor(0, evloop_run, arg);
+}
+
/**
* Gets a sd_event item for the current thread.
* @return a sd_event or NULL in case of error
@@ -688,21 +727,31 @@ static struct sd_event *get_sd_event_locked()
rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
if (rc < 0) {
ERROR("can't register eventfd");
+ goto error3;
+ }
+ /* handle the event loop */
+ el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
+ if (!el->fdev) {
+ ERROR("can't create fdev");
+error3:
sd_event_unref(el->sdev);
- el->sdev = NULL;
error2:
close(el->efd);
error1:
+ memset(el, 0, sizeof *el);
return NULL;
}
+ fdev_set_autoclose(el->fdev, 0);
+ fdev_set_events(el->fdev, EPOLLIN);
+ fdev_set_callback(el->fdev, evloop_callback, el);
}
/* attach the event loop to the current thread */
if (current_evloop != el) {
if (current_evloop)
- __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
+ __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
current_evloop = el;
- __atomic_add_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
+ __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
}
/* wait for a modifiable event loop */
@@ -731,6 +780,21 @@ struct sd_event *jobs_get_sd_event()
}
/**
+ * Gets the fdev_epoll item.
+ * @return a fdev_epoll or NULL in case of error
+ */
+struct fdev_epoll *jobs_get_fdev_epoll()
+{
+ struct fdev_epoll *result;
+
+ pthread_mutex_lock(&mutex);
+ result = get_fdevepoll();
+ pthread_mutex_unlock(&mutex);
+
+ return result;
+}
+
+/**
* Enter the jobs processing loop.
* @param allowed_count Maximum count of thread for jobs including this one
* @param start_count Count of thread to start now, must be lower.
diff --git a/src/jobs.h b/src/jobs.h
index aa7b4c86..8ad6efc1 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -17,6 +17,7 @@
#pragma once
+struct fdev_epoll;
struct sd_event;
struct jobloop;
@@ -41,6 +42,7 @@ extern int jobs_call(
void *arg);
extern struct sd_event *jobs_get_sd_event();
+extern struct fdev_epoll *jobs_get_fdev_epoll();
extern void jobs_terminate();