summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2016-05-03 10:03:58 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2016-05-04 11:55:38 +0200
commit5dd6480727cc1ecb12483fc4d971d73176505748 (patch)
tree495925fdba144f609daaad6da07281fd9bd94b69
parentf262b0f726ac0577f40525038b779185f144873f (diff)
Switch to libsystemd events
This patch removes part of code that are not specific in favour of a more shared library. Change-Id: I3506e7514181cfbed753559bb65460f95b2141c9 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--CMakeLists.txt3
-rw-r--r--include/afb-plugin.h22
-rw-r--r--include/afb-pollmgr-itf.h61
-rw-r--r--plugins/CMakeLists.txt2
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/afb-api-so.c22
-rw-r--r--src/afb-common.c89
-rw-r--r--src/afb-common.h (renamed from src/utils-upoll.h)26
-rw-r--r--src/afb-hsrv.c41
-rw-r--r--src/afb-websock.c7
-rw-r--r--src/afb-ws.c33
-rw-r--r--src/main.c8
-rw-r--r--src/utils-upoll.c328
13 files changed, 184 insertions, 460 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2ca0e892..4a856289 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -53,6 +53,7 @@ IF(CMAKE_BUILD_TYPE MATCHES Debug)
ENDIF(CMAKE_BUILD_TYPE MATCHES Debug)
INCLUDE(FindPkgConfig)
+PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd)
PKG_CHECK_MODULES(json-c REQUIRED json-c)
PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.48)
PKG_CHECK_MODULES(openssl REQUIRED openssl)
@@ -90,6 +91,7 @@ FIND_PACKAGE(Threads)
SET(include_dirs
${INCLUDE_DIRS}
${CMAKE_SOURCE_DIR}/include
+ ${libsystemd_INCLUDE_DIRS}
${json-c_INCLUDE_DIRS}
${libmicrohttpd_INCLUDE_DIRS}
${uuid_INCLUDE_DIRS}
@@ -102,6 +104,7 @@ SET(include_dirs
)
SET(link_libraries
+ ${libsystemd_LIBRARIES}
${json-c_LIBRARIES}
${libmicrohttpd_LIBRARIES}
${uuid_LIBRARIES}
diff --git a/include/afb-plugin.h b/include/afb-plugin.h
index dfb6fbad..ce78e840 100644
--- a/include/afb-plugin.h
+++ b/include/afb-plugin.h
@@ -18,7 +18,6 @@
#pragma once
#include "afb-req-itf.h"
-#include "afb-pollmgr-itf.h"
#include "afb-evmgr-itf.h"
/* Plugin Type */
@@ -64,9 +63,14 @@ enum AFB_Mode {
AFB_MODE_GLOBAL
};
+struct sd_event;
+struct sd_bus;
+
struct afb_daemon_itf {
struct afb_evmgr (*get_evmgr)(void *closure);
- struct afb_pollmgr (*get_pollmgr)(void *closure);
+ struct sd_event *(*get_event_loop)(void *closure);
+ struct sd_bus *(*get_user_bus)(void *closure);
+ struct sd_bus *(*get_system_bus)(void *closure);
};
struct afb_daemon {
@@ -88,9 +92,19 @@ static inline struct afb_evmgr afb_daemon_get_evmgr(struct afb_daemon daemon)
return daemon.itf->get_evmgr(daemon.closure);
}
-static inline struct afb_pollmgr afb_daemon_get_pollmgr(struct afb_daemon daemon)
+static inline struct sd_event *afb_daemon_get_event_loop(struct afb_daemon daemon)
+{
+ return daemon.itf->get_event_loop(daemon.closure);
+}
+
+static inline struct sd_bus *afb_daemon_get_user_bus(struct afb_daemon daemon)
+{
+ return daemon.itf->get_user_bus(daemon.closure);
+}
+
+static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon)
{
- return daemon.itf->get_pollmgr(daemon.closure);
+ return daemon.itf->get_system_bus(daemon.closure);
}
diff --git a/include/afb-pollmgr-itf.h b/include/afb-pollmgr-itf.h
deleted file mode 100644
index c3a62c17..00000000
--- a/include/afb-pollmgr-itf.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-struct afb_pollmgr_itf
-{
- int (*wait)(int timeout, void *pollclosure);
- void *(*open)(int fd, void *closure, void *pollclosure);
- int (*on_readable)(void *hndl, void (*cb)(void *closure));
- int (*on_writable)(void *hndl, void (*cb)(void *closure));
- void (*on_hangup)(void *hndl, void (*cb)(void *closure));
- void (*close)(void *hndl);
-};
-
-struct afb_pollmgr
-{
- const struct afb_pollmgr_itf *itf;
- void *closure;
-};
-
-static inline int afb_pollmgr_wait(struct afb_pollmgr pollmgr, int timeout)
-{
- return pollmgr.itf->wait(timeout, pollmgr.closure);
-}
-
-static inline void *afb_pollmgr_open(struct afb_pollmgr pollmgr, int fd, void *closure)
-{
- return pollmgr.itf->open(fd, closure, pollmgr.closure);
-}
-
-static inline int afb_pollmgr_on_readable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
- return pollmgr.itf->on_readable(hndl, cb);
-}
-
-static inline int afb_pollmgr_on_writable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
- return pollmgr.itf->on_writable(hndl, cb);
-}
-
-static inline void afb_pollmgr_on_hangup(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
- pollmgr.itf->on_hangup(hndl, cb);
-}
-
-
diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt
index c60e3527..5d8b26ac 100644
--- a/plugins/CMakeLists.txt
+++ b/plugins/CMakeLists.txt
@@ -1,4 +1,4 @@
-ADD_SUBDIRECTORY(afm-main-plugin)
+#ADD_SUBDIRECTORY(afm-main-plugin)
ADD_SUBDIRECTORY(session)
ADD_SUBDIRECTORY(samples)
#ADD_SUBDIRECTORY(audio)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5153c5c1..bc5fd4f0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -12,9 +12,9 @@ ADD_LIBRARY(src OBJECT
afb-ws.c
afb-ws-json.c
afb-msg-json.c
+ afb-common.c
websock.c
verbose.c
- utils-upoll.c
)
INCLUDE_DIRECTORIES(${include_dirs})
diff --git a/src/afb-api-so.c b/src/afb-api-so.c
index 88246b58..89418fb0 100644
--- a/src/afb-api-so.c
+++ b/src/afb-api-so.c
@@ -33,14 +33,13 @@
#include "afb-plugin.h"
#include "afb-req-itf.h"
-#include "afb-pollmgr-itf.h"
#include "afb-evmgr-itf.h"
#include "session.h"
+#include "afb-common.h"
#include "afb-apis.h"
#include "afb-api-so.h"
#include "verbose.h"
-#include "utils-upoll.h"
extern __thread sigjmp_buf *error_handler;
@@ -55,15 +54,6 @@ static int api_timeout = 15;
static const char plugin_register_function[] = "pluginRegister";
-static const struct afb_pollmgr_itf pollmgr_itf = {
- .wait = (void*)upoll_wait,
- .open = (void*)upoll_open,
- .on_readable = (void*)upoll_on_readable,
- .on_writable = (void*)upoll_on_writable,
- .on_hangup = (void*)upoll_on_hangup,
- .close = (void*)upoll_close
-};
-
static void afb_api_so_evmgr_push(struct api_so_desc *desc, const char *name, struct json_object *object)
{
size_t length;
@@ -87,16 +77,14 @@ static struct afb_evmgr afb_api_so_get_evmgr(struct api_so_desc *desc)
return (struct afb_evmgr){ .itf = &evmgr_itf, .closure = desc };
}
-static struct afb_pollmgr afb_api_so_get_pollmgr(struct api_so_desc *desc)
-{
- return (struct afb_pollmgr){ .itf = &pollmgr_itf, .closure = NULL };
-}
-
static const struct afb_daemon_itf daemon_itf = {
.get_evmgr = (void*)afb_api_so_get_evmgr,
- .get_pollmgr = (void*)afb_api_so_get_pollmgr
+ .get_event_loop = (void*)afb_common_get_event_loop,
+ .get_user_bus = (void*)afb_common_get_user_bus,
+ .get_system_bus = (void*)afb_common_get_system_bus
};
+
static void trapping_call(struct afb_req req, void(*cb)(struct afb_req))
{
volatile int signum, timerset;
diff --git a/src/afb-common.c b/src/afb-common.c
new file mode 100644
index 00000000..e8fe4b72
--- /dev/null
+++ b/src/afb-common.c
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2015 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <errno.h>
+#include <systemd/sd-event.h>
+#include <systemd/sd-bus.h>
+
+#include "afb-common.h"
+
+/*
+struct sd_event *afb_common_get_thread_event_loop()
+{
+ sd_event *result;
+ int rc = sd_event_default(&result);
+ if (rc != 0) {
+ errno = -rc;
+ result = NULL;
+ }
+ return result;
+}
+*/
+
+static void *sdopen(void **p, int (*f)(void **))
+{
+ if (*p == NULL) {
+ int rc = f(p);
+ if (rc < 0) {
+ errno = -rc;
+ *p = NULL;
+ }
+ }
+ return *p;
+}
+
+static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
+{
+ if (*p == NULL) {
+ int rc = f(p);
+ if (rc < 0) {
+ errno = -rc;
+ *p = NULL;
+ } else {
+ rc = sd_bus_attach_event(*p, afb_common_get_event_loop(), 0);
+ if (rc < 0) {
+ sd_bus_unref(*p);
+ errno = -rc;
+ *p = NULL;
+ }
+ }
+ }
+ return *p;
+}
+
+struct sd_event *afb_common_get_event_loop()
+{
+ static sd_event *result = NULL;
+ return sdopen((void*)&result, (void*)sd_event_new);
+}
+
+struct sd_bus *afb_common_get_user_bus()
+{
+ static struct sd_bus *result = NULL;
+ return sdbusopen((void*)&result, (void*)sd_bus_open_user);
+}
+
+struct sd_bus *afb_common_get_system_bus()
+{
+ static struct sd_bus *result = NULL;
+ return sdbusopen((void*)&result, (void*)sd_bus_open_system);
+}
+
+
+
diff --git a/src/utils-upoll.h b/src/afb-common.h
index 56692d3f..fe17bc93 100644
--- a/src/utils-upoll.h
+++ b/src/afb-common.h
@@ -1,6 +1,6 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
+/*
+ * Copyright (C) 2015 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,20 +17,10 @@
#pragma once
-struct upoll;
-
-extern int upoll_is_valid(struct upoll *upoll);
-
-extern struct upoll *upoll_open(int fd, void *closure);
-
-extern int upoll_on_readable(struct upoll *upoll, void (*process)(void *closure));
-extern int upoll_on_writable(struct upoll *upoll, void (*process)(void *closure));
-
-extern void upoll_on_hangup(struct upoll *upoll, void (*process)(void *closure));
-
-extern void upoll_close(struct upoll *upoll);
-
-extern int upoll_wait(int timeout);
-
+struct sd_event;
+struct sd_bus;
+extern struct sd_event *afb_common_get_event_loop();
+extern struct sd_bus *afb_common_get_user_bus();
+extern struct sd_bus *afb_common_get_system_bus();
diff --git a/src/afb-hsrv.c b/src/afb-hsrv.c
index a8338251..f514c97d 100644
--- a/src/afb-hsrv.c
+++ b/src/afb-hsrv.c
@@ -17,21 +17,26 @@
#define _GNU_SOURCE
+#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <poll.h>
#include <fcntl.h>
+#include <errno.h>
#include <sys/stat.h>
#include <microhttpd.h>
+#include <systemd/sd-event.h>
#include "afb-method.h"
#include "afb-hreq.h"
#include "afb-hsrv.h"
#include "afb-req-itf.h"
#include "verbose.h"
-#include "utils-upoll.h"
+
+#include "afb-common.h"
+
#define JSON_CONTENT "application/json"
@@ -58,7 +63,7 @@ struct afb_hsrv {
unsigned refcount;
struct hsrv_handler *handlers;
struct MHD_Daemon *httpd;
- struct upoll *upoll;
+ sd_event_source *evsrc;
int in_run;
char *cache_to;
};
@@ -231,15 +236,21 @@ void run_micro_httpd(struct afb_hsrv *hsrv)
if (hsrv->in_run != 0)
hsrv->in_run = 2;
else {
- upoll_on_readable(hsrv->upoll, NULL);
+ sd_event_source_set_io_events(hsrv->evsrc, 0);
do {
hsrv->in_run = 1;
MHD_run(hsrv->httpd);
} while(hsrv->in_run == 2);
hsrv->in_run = 0;
- upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd);
+ sd_event_source_set_io_events(hsrv->evsrc, EPOLLIN);
}
-};
+}
+
+static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *hsrv)
+{
+ run_micro_httpd(hsrv);
+ return 0;
+}
static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen)
{
@@ -360,7 +371,8 @@ int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration)
int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout)
{
- struct upoll *upoll;
+ sd_event_source *evsrc;
+ int rc;
struct MHD_Daemon *httpd;
const union MHD_DaemonInfo *info;
@@ -385,24 +397,25 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection
return 0;
}
- upoll = upoll_open(info->listen_fd, hsrv);
- if (upoll == NULL) {
+ rc = sd_event_add_io(afb_common_get_event_loop(), &evsrc, info->listen_fd, EPOLLIN, io_event_callback, hsrv);
+ if (rc < 0) {
MHD_stop_daemon(httpd);
- fprintf(stderr, "Error: connection to upoll of httpd failed");
+ errno = -rc;
+ fprintf(stderr, "Error: connection to events for httpd failed");
return 0;
}
- upoll_on_readable(upoll, (void*)run_micro_httpd);
hsrv->httpd = httpd;
- hsrv->upoll = upoll;
+ hsrv->evsrc = evsrc;
return 1;
}
void afb_hsrv_stop(struct afb_hsrv *hsrv)
{
- if (hsrv->upoll)
- upoll_close(hsrv->upoll);
- hsrv->upoll = NULL;
+ if (hsrv->evsrc != NULL) {
+ sd_event_source_unref(hsrv->evsrc);
+ hsrv->evsrc = NULL;
+ }
if (hsrv->httpd != NULL)
MHD_stop_daemon(hsrv->httpd);
hsrv->httpd = NULL;
diff --git a/src/afb-websock.c b/src/afb-websock.c
index b4ad57a1..9f5619fb 100644
--- a/src/afb-websock.c
+++ b/src/afb-websock.c
@@ -204,8 +204,11 @@ int afb_websock_check_upgrade(struct afb_hreq *hreq)
ws = NULL;
rc = check_websocket_upgrade(hreq->connection, protodefs, afb_hreq_context(hreq), &ws);
- if (rc && ws != NULL)
- hreq->upgrade = 1;
+ if (rc == 1) {
+ hreq->replied = 1;
+ if (ws != NULL)
+ hreq->upgrade = 1;
+ }
return rc;
}
diff --git a/src/afb-ws.c b/src/afb-ws.c
index da248c89..d0cfc8a8 100644
--- a/src/afb-ws.c
+++ b/src/afb-ws.c
@@ -24,10 +24,12 @@
#include <sys/uio.h>
#include <string.h>
+#include <systemd/sd-event.h>
+
#include "websock.h"
#include "afb-ws.h"
-#include "utils-upoll.h"
+#include "afb-common.h"
/*
* declaration of the websock interface for afb-ws
@@ -85,7 +87,7 @@ struct afb_ws
const struct afb_ws_itf *itf; /* the callback interface */
void *closure; /* closure when calling the callbacks */
struct websock *ws; /* the websock handler */
- struct upoll *up; /* the upoll handler for the socket */
+ sd_event_source *evsrc; /* the event source for the socket */
struct buf buffer; /* the last read fragment */
};
@@ -109,8 +111,8 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
struct websock *wsi = ws->ws;
if (wsi != NULL) {
ws->ws = NULL;
- upoll_close(ws->up);
- ws->up = NULL;
+ sd_event_source_unref(ws->evsrc);
+ ws->evsrc = NULL;
websock_destroy(wsi);
free(aws_pick_buffer(ws).buffer);
ws->state = waiting;
@@ -119,6 +121,15 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
}
}
+static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *ws)
+{
+ if ((revents & EPOLLIN) != 0)
+ aws_on_readable(ws);
+ if ((revents & EPOLLHUP) != 0)
+ afb_ws_hangup(ws);
+ return 0;
+}
+
/*
* Creates the afb_ws structure for the file descritor
* 'fd' and the callbacks described by the interface 'itf'
@@ -128,6 +139,7 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
*/
struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure)
{
+ int rc;
struct afb_ws *result;
assert(fd >= 0);
@@ -150,15 +162,12 @@ struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure
if (result->ws == NULL)
goto error2;
- /* creates the upoll */
- result->up = upoll_open(result->fd, result);
- if (result->up == NULL)
+ /* creates the evsrc */
+ rc = sd_event_add_io(afb_common_get_event_loop(), &result->evsrc, result->fd, EPOLLIN, io_event_callback, result);
+ if (rc < 0) {
+ errno = -rc;
goto error3;
-
- /* init the upoll */
- upoll_on_readable(result->up, (void*)aws_on_readable);
- upoll_on_hangup(result->up, (void*)afb_ws_hangup);
-
+ }
return result;
error3:
diff --git a/src/main.c b/src/main.c
index 8ec684fe..fcedd2a6 100644
--- a/src/main.c
+++ b/src/main.c
@@ -32,6 +32,8 @@
#include <signal.h>
#include <syslog.h>
+#include <systemd/sd-event.h>
+
#include "afb-config.h"
#include "afb-hswitch.h"
#include "afb-apis.h"
@@ -40,7 +42,7 @@
#include "afb-hreq.h"
#include "session.h"
#include "verbose.h"
-#include "utils-upoll.h"
+#include "afb-common.h"
#include "afb-plugin.h"
@@ -559,6 +561,7 @@ static struct afb_hsrv *start_http_server(struct afb_config * config)
int main(int argc, char *argv[]) {
struct afb_hsrv *hsrv;
struct afb_config *config;
+ struct sd_event *eventloop;
// open syslog if ever needed
openlog("afb-daemon", 0, LOG_DAEMON);
@@ -623,8 +626,9 @@ int main(int argc, char *argv[]) {
}
// infinite loop
+ eventloop = afb_common_get_event_loop();
for(;;)
- upoll_wait(30000);
+ sd_event_run(eventloop, 30000000);
if (verbosity)
fprintf (stderr, "hoops returned from infinite loop [report bug]\n");
diff --git a/src/utils-upoll.c b/src/utils-upoll.c
deleted file mode 100644
index 03f9a08b..00000000
--- a/src/utils-upoll.c
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/epoll.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <errno.h>
-#include <unistd.h>
-#include <assert.h>
-
-#include "utils-upoll.h"
-
-struct upollfd;
-
-/*
- * Structure describing one opened client
- */
-struct upoll
-{
- struct upollfd *fd; /* structure handling the file descriptor */
- void (*read)(void *); /* callback for handling on_readable */
- void (*write)(void *); /* callback for handling on_writable */
- void (*hangup)(void *); /* callback for handling on_hangup */
- void *closure; /* closure for callbacks */
- struct upoll *next; /* next client of the same file descriptor */
-};
-
-/*
- * Structure describing a watched file descriptor
- */
-struct upollfd
-{
- int fd; /* watch file descriptor */
- uint32_t events; /* watched events */
- struct upollfd *next; /* next watched file descriptor */
- struct upoll *head; /* first client watching the file descriptor */
-};
-
-
-/*
- * Structure describing a upoll group
-struct upollgrp
-{
- int pollfd;
- struct upollfd *head;
- struct upoll *current;
- pthread_mutex_t mutex;
-};
-
-
-static struct upollgrp global = {
- .pollfd = 0,
- .head = NULL,
- .current = NULL,
- .mutex = PTHREAD_MUTEX_INITIALIZER
-};
- */
-
-static int pollfd = 0;
-static struct upollfd *head = NULL;
-static struct upoll *current = NULL;
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-/*
- * Compute the events for the set of clients
- */
-static int update_flags_locked(struct upollfd *ufd)
-{
- int rc;
- struct upoll *u;
- struct epoll_event e;
- uint32_t events;
-
- /* compute expected events */
- events = 0;
- u = ufd->head;
- while (u != NULL) {
- if (u->read != NULL)
- events |= EPOLLIN;
- if (u->write != NULL)
- events |= EPOLLOUT;
- u = u->next;
- }
- if (ufd->events == events)
- rc = 0;
- else {
- e.events = events;
- e.data.ptr = ufd;
- rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
- if (rc == 0)
- ufd->events = events;
- }
- pthread_mutex_unlock(&mutex);
- return rc;
-}
-
-/*
- * Compute the events for the set of clients
- */
-static int update_flags(struct upollfd *ufd)
-{
- pthread_mutex_lock(&mutex);
- return update_flags_locked(ufd);
-}
-
-/*
- *
- */
-static int update(struct upollfd *ufd)
-{
- struct upollfd **prv;
-
- pthread_mutex_lock(&mutex);
- if (ufd->head != NULL)
- return update_flags_locked(ufd);
-
- /* no more watchers */
- prv = &head;
- while(*prv) {
- if (*prv == ufd) {
- *prv = ufd->next;
- break;
- }
- prv = &(*prv)->next;
- }
- pthread_mutex_unlock(&mutex);
- epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
- free(ufd);
- return 0;
-}
-
-static struct upollfd *get_fd(int fd)
-{
- struct epoll_event e;
- struct upollfd *result;
- int rc;
-
- /* opens the epoll stream */
- if (pollfd == 0) {
- pollfd = epoll_create1(EPOLL_CLOEXEC);
- if (pollfd == 0) {
- pollfd = dup(0);
- close(0);
- }
- if (pollfd < 0) {
- pollfd = 0;
- return NULL;
- }
- }
-
- /* search */
- result = head;
- while (result != NULL) {
- if (result->fd == fd)
- return result;
- result = result->next;
- }
-
- /* allocates */
- result = calloc(1, sizeof *result);
- if (result == NULL)
- return NULL;
-
- /* init */
- result->fd = fd;
- pthread_mutex_lock(&mutex);
- result->next = head;
- head = result;
- pthread_mutex_unlock(&mutex);
-
- /* records */
- e.events = 0;
- e.data.ptr = result;
- rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
- if (rc == 0)
- return result;
-
- /* revert on error */
- rc = errno;
- update(result);
- errno = rc;
- return NULL;
-}
-
-int upoll_is_valid(struct upoll *upoll)
-{
- struct upollfd *itfd = head;
- struct upoll *it;
- while (itfd != NULL) {
- it = itfd->head;
- while (it != NULL) {
- if (it == upoll)
- return 1;
- it = it->next;
- }
- itfd = itfd->next;
- }
- return 0;
-}
-
-struct upoll *upoll_open(int fd, void *closure)
-{
- struct upollfd *ufd;
- struct upoll *result;
-
- /* allocates */
- result = calloc(1, sizeof *result);
- if (result == NULL)
- return NULL;
-
- /* get for fd */
- ufd = get_fd(fd);
- if (ufd == NULL) {
- free(result);
- return NULL;
- }
-
- /* init */
- result->fd = ufd;
- result->closure = closure;
- pthread_mutex_lock(&mutex);
- result->next = ufd->head;
- ufd->head = result;
- pthread_mutex_unlock(&mutex);
- return result;
-}
-
-int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
-{
- assert(pollfd != 0);
- assert(upoll_is_valid(upoll));
-
- upoll->read = process;
- return update_flags(upoll->fd);
-}
-
-int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
-{
- assert(pollfd != 0);
- assert(upoll_is_valid(upoll));
-
- upoll->write = process;
- return update_flags(upoll->fd);
-}
-
-void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
-{
- assert(pollfd != 0);
- assert(upoll_is_valid(upoll));
-
- upoll->hangup = process;
-}
-
-void upoll_close(struct upoll *upoll)
-{
- struct upoll **it;
- struct upollfd *ufd;
-
- assert(pollfd != 0);
- assert(upoll_is_valid(upoll));
-
- ufd = upoll->fd;
- pthread_mutex_lock(&mutex);
- if (current == upoll)
- current = NULL;
- it = &ufd->head;
- while (*it != upoll)
- it = &(*it)->next;
- *it = upoll->next;
- pthread_mutex_unlock(&mutex);
- free(upoll);
- update(ufd);
-}
-
-int upoll_wait(int timeout)
-{
- int rc;
- struct epoll_event e;
- struct upollfd *ufd;
-
- if (pollfd == 0) {
- errno = ECANCELED;
- return -1;
- }
-
- do {
- rc = epoll_wait(pollfd, &e, 1, timeout);
- } while (rc < 0 && errno == EINTR);
- if (rc == 1) {
- ufd = e.data.ptr;
- current = ufd->head;
- e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
- while (current != NULL && e.events != 0) {
- if ((e.events & EPOLLIN) && current->read) {
- current->read(current->closure);
- e.events &= (uint32_t)~EPOLLIN;
- continue;
- }
- if ((e.events & EPOLLOUT) && current->write) {
- current->write(current->closure);
- e.events &= (uint32_t)~EPOLLOUT;
- continue;
- }
- if ((e.events & EPOLLHUP) && current->hangup) {
- current->hangup(current->closure);
- if (current == NULL)
- break;
- }
- current = current->next;
- }
- }
- return rc < 0 ? rc : 0;
-}
-