diff options
author | José Bollo <jose.bollo@iot.bzh> | 2016-05-03 10:03:58 +0200 |
---|---|---|
committer | José Bollo <jose.bollo@iot.bzh> | 2016-05-04 11:55:38 +0200 |
commit | 5dd6480727cc1ecb12483fc4d971d73176505748 (patch) | |
tree | 495925fdba144f609daaad6da07281fd9bd94b69 | |
parent | f262b0f726ac0577f40525038b779185f144873f (diff) |
Switch to libsystemd events
This patch removes part of code that are
not specific in favour of a more shared
library.
Change-Id: I3506e7514181cfbed753559bb65460f95b2141c9
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | include/afb-plugin.h | 22 | ||||
-rw-r--r-- | include/afb-pollmgr-itf.h | 61 | ||||
-rw-r--r-- | plugins/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/afb-api-so.c | 22 | ||||
-rw-r--r-- | src/afb-common.c | 89 | ||||
-rw-r--r-- | src/afb-common.h (renamed from src/utils-upoll.h) | 26 | ||||
-rw-r--r-- | src/afb-hsrv.c | 41 | ||||
-rw-r--r-- | src/afb-websock.c | 7 | ||||
-rw-r--r-- | src/afb-ws.c | 33 | ||||
-rw-r--r-- | src/main.c | 8 | ||||
-rw-r--r-- | src/utils-upoll.c | 328 |
13 files changed, 184 insertions, 460 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ca0e892..4a856289 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ IF(CMAKE_BUILD_TYPE MATCHES Debug) ENDIF(CMAKE_BUILD_TYPE MATCHES Debug) INCLUDE(FindPkgConfig) +PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd) PKG_CHECK_MODULES(json-c REQUIRED json-c) PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.48) PKG_CHECK_MODULES(openssl REQUIRED openssl) @@ -90,6 +91,7 @@ FIND_PACKAGE(Threads) SET(include_dirs ${INCLUDE_DIRS} ${CMAKE_SOURCE_DIR}/include + ${libsystemd_INCLUDE_DIRS} ${json-c_INCLUDE_DIRS} ${libmicrohttpd_INCLUDE_DIRS} ${uuid_INCLUDE_DIRS} @@ -102,6 +104,7 @@ SET(include_dirs ) SET(link_libraries + ${libsystemd_LIBRARIES} ${json-c_LIBRARIES} ${libmicrohttpd_LIBRARIES} ${uuid_LIBRARIES} diff --git a/include/afb-plugin.h b/include/afb-plugin.h index dfb6fbad..ce78e840 100644 --- a/include/afb-plugin.h +++ b/include/afb-plugin.h @@ -18,7 +18,6 @@ #pragma once #include "afb-req-itf.h" -#include "afb-pollmgr-itf.h" #include "afb-evmgr-itf.h" /* Plugin Type */ @@ -64,9 +63,14 @@ enum AFB_Mode { AFB_MODE_GLOBAL }; +struct sd_event; +struct sd_bus; + struct afb_daemon_itf { struct afb_evmgr (*get_evmgr)(void *closure); - struct afb_pollmgr (*get_pollmgr)(void *closure); + struct sd_event *(*get_event_loop)(void *closure); + struct sd_bus *(*get_user_bus)(void *closure); + struct sd_bus *(*get_system_bus)(void *closure); }; struct afb_daemon { @@ -88,9 +92,19 @@ static inline struct afb_evmgr afb_daemon_get_evmgr(struct afb_daemon daemon) return daemon.itf->get_evmgr(daemon.closure); } -static inline struct afb_pollmgr afb_daemon_get_pollmgr(struct afb_daemon daemon) +static inline struct sd_event *afb_daemon_get_event_loop(struct afb_daemon daemon) +{ + return daemon.itf->get_event_loop(daemon.closure); +} + +static inline struct sd_bus *afb_daemon_get_user_bus(struct afb_daemon daemon) +{ + return daemon.itf->get_user_bus(daemon.closure); +} + +static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon) { - return daemon.itf->get_pollmgr(daemon.closure); + return daemon.itf->get_system_bus(daemon.closure); } diff --git a/include/afb-pollmgr-itf.h b/include/afb-pollmgr-itf.h deleted file mode 100644 index c3a62c17..00000000 --- a/include/afb-pollmgr-itf.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2016 IoT.bzh - * Author: José Bollo <jose.bollo@iot.bzh> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -struct afb_pollmgr_itf -{ - int (*wait)(int timeout, void *pollclosure); - void *(*open)(int fd, void *closure, void *pollclosure); - int (*on_readable)(void *hndl, void (*cb)(void *closure)); - int (*on_writable)(void *hndl, void (*cb)(void *closure)); - void (*on_hangup)(void *hndl, void (*cb)(void *closure)); - void (*close)(void *hndl); -}; - -struct afb_pollmgr -{ - const struct afb_pollmgr_itf *itf; - void *closure; -}; - -static inline int afb_pollmgr_wait(struct afb_pollmgr pollmgr, int timeout) -{ - return pollmgr.itf->wait(timeout, pollmgr.closure); -} - -static inline void *afb_pollmgr_open(struct afb_pollmgr pollmgr, int fd, void *closure) -{ - return pollmgr.itf->open(fd, closure, pollmgr.closure); -} - -static inline int afb_pollmgr_on_readable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure)) -{ - return pollmgr.itf->on_readable(hndl, cb); -} - -static inline int afb_pollmgr_on_writable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure)) -{ - return pollmgr.itf->on_writable(hndl, cb); -} - -static inline void afb_pollmgr_on_hangup(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure)) -{ - pollmgr.itf->on_hangup(hndl, cb); -} - - diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index c60e3527..5d8b26ac 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -1,4 +1,4 @@ -ADD_SUBDIRECTORY(afm-main-plugin) +#ADD_SUBDIRECTORY(afm-main-plugin) ADD_SUBDIRECTORY(session) ADD_SUBDIRECTORY(samples) #ADD_SUBDIRECTORY(audio) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5153c5c1..bc5fd4f0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,9 +12,9 @@ ADD_LIBRARY(src OBJECT afb-ws.c afb-ws-json.c afb-msg-json.c + afb-common.c websock.c verbose.c - utils-upoll.c ) INCLUDE_DIRECTORIES(${include_dirs}) diff --git a/src/afb-api-so.c b/src/afb-api-so.c index 88246b58..89418fb0 100644 --- a/src/afb-api-so.c +++ b/src/afb-api-so.c @@ -33,14 +33,13 @@ #include "afb-plugin.h" #include "afb-req-itf.h" -#include "afb-pollmgr-itf.h" #include "afb-evmgr-itf.h" #include "session.h" +#include "afb-common.h" #include "afb-apis.h" #include "afb-api-so.h" #include "verbose.h" -#include "utils-upoll.h" extern __thread sigjmp_buf *error_handler; @@ -55,15 +54,6 @@ static int api_timeout = 15; static const char plugin_register_function[] = "pluginRegister"; -static const struct afb_pollmgr_itf pollmgr_itf = { - .wait = (void*)upoll_wait, - .open = (void*)upoll_open, - .on_readable = (void*)upoll_on_readable, - .on_writable = (void*)upoll_on_writable, - .on_hangup = (void*)upoll_on_hangup, - .close = (void*)upoll_close -}; - static void afb_api_so_evmgr_push(struct api_so_desc *desc, const char *name, struct json_object *object) { size_t length; @@ -87,16 +77,14 @@ static struct afb_evmgr afb_api_so_get_evmgr(struct api_so_desc *desc) return (struct afb_evmgr){ .itf = &evmgr_itf, .closure = desc }; } -static struct afb_pollmgr afb_api_so_get_pollmgr(struct api_so_desc *desc) -{ - return (struct afb_pollmgr){ .itf = &pollmgr_itf, .closure = NULL }; -} - static const struct afb_daemon_itf daemon_itf = { .get_evmgr = (void*)afb_api_so_get_evmgr, - .get_pollmgr = (void*)afb_api_so_get_pollmgr + .get_event_loop = (void*)afb_common_get_event_loop, + .get_user_bus = (void*)afb_common_get_user_bus, + .get_system_bus = (void*)afb_common_get_system_bus }; + static void trapping_call(struct afb_req req, void(*cb)(struct afb_req)) { volatile int signum, timerset; diff --git a/src/afb-common.c b/src/afb-common.c new file mode 100644 index 00000000..e8fe4b72 --- /dev/null +++ b/src/afb-common.c @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2015 "IoT.bzh" + * Author José Bollo <jose.bollo@iot.bzh> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <errno.h> +#include <systemd/sd-event.h> +#include <systemd/sd-bus.h> + +#include "afb-common.h" + +/* +struct sd_event *afb_common_get_thread_event_loop() +{ + sd_event *result; + int rc = sd_event_default(&result); + if (rc != 0) { + errno = -rc; + result = NULL; + } + return result; +} +*/ + +static void *sdopen(void **p, int (*f)(void **)) +{ + if (*p == NULL) { + int rc = f(p); + if (rc < 0) { + errno = -rc; + *p = NULL; + } + } + return *p; +} + +static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **)) +{ + if (*p == NULL) { + int rc = f(p); + if (rc < 0) { + errno = -rc; + *p = NULL; + } else { + rc = sd_bus_attach_event(*p, afb_common_get_event_loop(), 0); + if (rc < 0) { + sd_bus_unref(*p); + errno = -rc; + *p = NULL; + } + } + } + return *p; +} + +struct sd_event *afb_common_get_event_loop() +{ + static sd_event *result = NULL; + return sdopen((void*)&result, (void*)sd_event_new); +} + +struct sd_bus *afb_common_get_user_bus() +{ + static struct sd_bus *result = NULL; + return sdbusopen((void*)&result, (void*)sd_bus_open_user); +} + +struct sd_bus *afb_common_get_system_bus() +{ + static struct sd_bus *result = NULL; + return sdbusopen((void*)&result, (void*)sd_bus_open_system); +} + + + diff --git a/src/utils-upoll.h b/src/afb-common.h index 56692d3f..fe17bc93 100644 --- a/src/utils-upoll.h +++ b/src/afb-common.h @@ -1,6 +1,6 @@ -/* - * Copyright 2016 IoT.bzh - * Author: José Bollo <jose.bollo@iot.bzh> +/* + * Copyright (C) 2015 "IoT.bzh" + * Author José Bollo <jose.bollo@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,20 +17,10 @@ #pragma once -struct upoll; - -extern int upoll_is_valid(struct upoll *upoll); - -extern struct upoll *upoll_open(int fd, void *closure); - -extern int upoll_on_readable(struct upoll *upoll, void (*process)(void *closure)); -extern int upoll_on_writable(struct upoll *upoll, void (*process)(void *closure)); - -extern void upoll_on_hangup(struct upoll *upoll, void (*process)(void *closure)); - -extern void upoll_close(struct upoll *upoll); - -extern int upoll_wait(int timeout); - +struct sd_event; +struct sd_bus; +extern struct sd_event *afb_common_get_event_loop(); +extern struct sd_bus *afb_common_get_user_bus(); +extern struct sd_bus *afb_common_get_system_bus(); diff --git a/src/afb-hsrv.c b/src/afb-hsrv.c index a8338251..f514c97d 100644 --- a/src/afb-hsrv.c +++ b/src/afb-hsrv.c @@ -17,21 +17,26 @@ #define _GNU_SOURCE +#include <stdint.h> #include <stdio.h> #include <string.h> #include <assert.h> #include <poll.h> #include <fcntl.h> +#include <errno.h> #include <sys/stat.h> #include <microhttpd.h> +#include <systemd/sd-event.h> #include "afb-method.h" #include "afb-hreq.h" #include "afb-hsrv.h" #include "afb-req-itf.h" #include "verbose.h" -#include "utils-upoll.h" + +#include "afb-common.h" + #define JSON_CONTENT "application/json" @@ -58,7 +63,7 @@ struct afb_hsrv { unsigned refcount; struct hsrv_handler *handlers; struct MHD_Daemon *httpd; - struct upoll *upoll; + sd_event_source *evsrc; int in_run; char *cache_to; }; @@ -231,15 +236,21 @@ void run_micro_httpd(struct afb_hsrv *hsrv) if (hsrv->in_run != 0) hsrv->in_run = 2; else { - upoll_on_readable(hsrv->upoll, NULL); + sd_event_source_set_io_events(hsrv->evsrc, 0); do { hsrv->in_run = 1; MHD_run(hsrv->httpd); } while(hsrv->in_run == 2); hsrv->in_run = 0; - upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd); + sd_event_source_set_io_events(hsrv->evsrc, EPOLLIN); } -}; +} + +static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *hsrv) +{ + run_micro_httpd(hsrv); + return 0; +} static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen) { @@ -360,7 +371,8 @@ int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration) int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout) { - struct upoll *upoll; + sd_event_source *evsrc; + int rc; struct MHD_Daemon *httpd; const union MHD_DaemonInfo *info; @@ -385,24 +397,25 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection return 0; } - upoll = upoll_open(info->listen_fd, hsrv); - if (upoll == NULL) { + rc = sd_event_add_io(afb_common_get_event_loop(), &evsrc, info->listen_fd, EPOLLIN, io_event_callback, hsrv); + if (rc < 0) { MHD_stop_daemon(httpd); - fprintf(stderr, "Error: connection to upoll of httpd failed"); + errno = -rc; + fprintf(stderr, "Error: connection to events for httpd failed"); return 0; } - upoll_on_readable(upoll, (void*)run_micro_httpd); hsrv->httpd = httpd; - hsrv->upoll = upoll; + hsrv->evsrc = evsrc; return 1; } void afb_hsrv_stop(struct afb_hsrv *hsrv) { - if (hsrv->upoll) - upoll_close(hsrv->upoll); - hsrv->upoll = NULL; + if (hsrv->evsrc != NULL) { + sd_event_source_unref(hsrv->evsrc); + hsrv->evsrc = NULL; + } if (hsrv->httpd != NULL) MHD_stop_daemon(hsrv->httpd); hsrv->httpd = NULL; diff --git a/src/afb-websock.c b/src/afb-websock.c index b4ad57a1..9f5619fb 100644 --- a/src/afb-websock.c +++ b/src/afb-websock.c @@ -204,8 +204,11 @@ int afb_websock_check_upgrade(struct afb_hreq *hreq) ws = NULL; rc = check_websocket_upgrade(hreq->connection, protodefs, afb_hreq_context(hreq), &ws); - if (rc && ws != NULL) - hreq->upgrade = 1; + if (rc == 1) { + hreq->replied = 1; + if (ws != NULL) + hreq->upgrade = 1; + } return rc; } diff --git a/src/afb-ws.c b/src/afb-ws.c index da248c89..d0cfc8a8 100644 --- a/src/afb-ws.c +++ b/src/afb-ws.c @@ -24,10 +24,12 @@ #include <sys/uio.h> #include <string.h> +#include <systemd/sd-event.h> + #include "websock.h" #include "afb-ws.h" -#include "utils-upoll.h" +#include "afb-common.h" /* * declaration of the websock interface for afb-ws @@ -85,7 +87,7 @@ struct afb_ws const struct afb_ws_itf *itf; /* the callback interface */ void *closure; /* closure when calling the callbacks */ struct websock *ws; /* the websock handler */ - struct upoll *up; /* the upoll handler for the socket */ + sd_event_source *evsrc; /* the event source for the socket */ struct buf buffer; /* the last read fragment */ }; @@ -109,8 +111,8 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup) struct websock *wsi = ws->ws; if (wsi != NULL) { ws->ws = NULL; - upoll_close(ws->up); - ws->up = NULL; + sd_event_source_unref(ws->evsrc); + ws->evsrc = NULL; websock_destroy(wsi); free(aws_pick_buffer(ws).buffer); ws->state = waiting; @@ -119,6 +121,15 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup) } } +static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *ws) +{ + if ((revents & EPOLLIN) != 0) + aws_on_readable(ws); + if ((revents & EPOLLHUP) != 0) + afb_ws_hangup(ws); + return 0; +} + /* * Creates the afb_ws structure for the file descritor * 'fd' and the callbacks described by the interface 'itf' @@ -128,6 +139,7 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup) */ struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure) { + int rc; struct afb_ws *result; assert(fd >= 0); @@ -150,15 +162,12 @@ struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure if (result->ws == NULL) goto error2; - /* creates the upoll */ - result->up = upoll_open(result->fd, result); - if (result->up == NULL) + /* creates the evsrc */ + rc = sd_event_add_io(afb_common_get_event_loop(), &result->evsrc, result->fd, EPOLLIN, io_event_callback, result); + if (rc < 0) { + errno = -rc; goto error3; - - /* init the upoll */ - upoll_on_readable(result->up, (void*)aws_on_readable); - upoll_on_hangup(result->up, (void*)afb_ws_hangup); - + } return result; error3: @@ -32,6 +32,8 @@ #include <signal.h> #include <syslog.h> +#include <systemd/sd-event.h> + #include "afb-config.h" #include "afb-hswitch.h" #include "afb-apis.h" @@ -40,7 +42,7 @@ #include "afb-hreq.h" #include "session.h" #include "verbose.h" -#include "utils-upoll.h" +#include "afb-common.h" #include "afb-plugin.h" @@ -559,6 +561,7 @@ static struct afb_hsrv *start_http_server(struct afb_config * config) int main(int argc, char *argv[]) { struct afb_hsrv *hsrv; struct afb_config *config; + struct sd_event *eventloop; // open syslog if ever needed openlog("afb-daemon", 0, LOG_DAEMON); @@ -623,8 +626,9 @@ int main(int argc, char *argv[]) { } // infinite loop + eventloop = afb_common_get_event_loop(); for(;;) - upoll_wait(30000); + sd_event_run(eventloop, 30000000); if (verbosity) fprintf (stderr, "hoops returned from infinite loop [report bug]\n"); diff --git a/src/utils-upoll.c b/src/utils-upoll.c deleted file mode 100644 index 03f9a08b..00000000 --- a/src/utils-upoll.c +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright 2016 IoT.bzh - * Author: José Bollo <jose.bollo@iot.bzh> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/epoll.h> -#include <pthread.h> -#include <stdlib.h> -#include <errno.h> -#include <unistd.h> -#include <assert.h> - -#include "utils-upoll.h" - -struct upollfd; - -/* - * Structure describing one opened client - */ -struct upoll -{ - struct upollfd *fd; /* structure handling the file descriptor */ - void (*read)(void *); /* callback for handling on_readable */ - void (*write)(void *); /* callback for handling on_writable */ - void (*hangup)(void *); /* callback for handling on_hangup */ - void *closure; /* closure for callbacks */ - struct upoll *next; /* next client of the same file descriptor */ -}; - -/* - * Structure describing a watched file descriptor - */ -struct upollfd -{ - int fd; /* watch file descriptor */ - uint32_t events; /* watched events */ - struct upollfd *next; /* next watched file descriptor */ - struct upoll *head; /* first client watching the file descriptor */ -}; - - -/* - * Structure describing a upoll group -struct upollgrp -{ - int pollfd; - struct upollfd *head; - struct upoll *current; - pthread_mutex_t mutex; -}; - - -static struct upollgrp global = { - .pollfd = 0, - .head = NULL, - .current = NULL, - .mutex = PTHREAD_MUTEX_INITIALIZER -}; - */ - -static int pollfd = 0; -static struct upollfd *head = NULL; -static struct upoll *current = NULL; -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - -/* - * Compute the events for the set of clients - */ -static int update_flags_locked(struct upollfd *ufd) -{ - int rc; - struct upoll *u; - struct epoll_event e; - uint32_t events; - - /* compute expected events */ - events = 0; - u = ufd->head; - while (u != NULL) { - if (u->read != NULL) - events |= EPOLLIN; - if (u->write != NULL) - events |= EPOLLOUT; - u = u->next; - } - if (ufd->events == events) - rc = 0; - else { - e.events = events; - e.data.ptr = ufd; - rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e); - if (rc == 0) - ufd->events = events; - } - pthread_mutex_unlock(&mutex); - return rc; -} - -/* - * Compute the events for the set of clients - */ -static int update_flags(struct upollfd *ufd) -{ - pthread_mutex_lock(&mutex); - return update_flags_locked(ufd); -} - -/* - * - */ -static int update(struct upollfd *ufd) -{ - struct upollfd **prv; - - pthread_mutex_lock(&mutex); - if (ufd->head != NULL) - return update_flags_locked(ufd); - - /* no more watchers */ - prv = &head; - while(*prv) { - if (*prv == ufd) { - *prv = ufd->next; - break; - } - prv = &(*prv)->next; - } - pthread_mutex_unlock(&mutex); - epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL); - free(ufd); - return 0; -} - -static struct upollfd *get_fd(int fd) -{ - struct epoll_event e; - struct upollfd *result; - int rc; - - /* opens the epoll stream */ - if (pollfd == 0) { - pollfd = epoll_create1(EPOLL_CLOEXEC); - if (pollfd == 0) { - pollfd = dup(0); - close(0); - } - if (pollfd < 0) { - pollfd = 0; - return NULL; - } - } - - /* search */ - result = head; - while (result != NULL) { - if (result->fd == fd) - return result; - result = result->next; - } - - /* allocates */ - result = calloc(1, sizeof *result); - if (result == NULL) - return NULL; - - /* init */ - result->fd = fd; - pthread_mutex_lock(&mutex); - result->next = head; - head = result; - pthread_mutex_unlock(&mutex); - - /* records */ - e.events = 0; - e.data.ptr = result; - rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e); - if (rc == 0) - return result; - - /* revert on error */ - rc = errno; - update(result); - errno = rc; - return NULL; -} - -int upoll_is_valid(struct upoll *upoll) -{ - struct upollfd *itfd = head; - struct upoll *it; - while (itfd != NULL) { - it = itfd->head; - while (it != NULL) { - if (it == upoll) - return 1; - it = it->next; - } - itfd = itfd->next; - } - return 0; -} - -struct upoll *upoll_open(int fd, void *closure) -{ - struct upollfd *ufd; - struct upoll *result; - - /* allocates */ - result = calloc(1, sizeof *result); - if (result == NULL) - return NULL; - - /* get for fd */ - ufd = get_fd(fd); - if (ufd == NULL) { - free(result); - return NULL; - } - - /* init */ - result->fd = ufd; - result->closure = closure; - pthread_mutex_lock(&mutex); - result->next = ufd->head; - ufd->head = result; - pthread_mutex_unlock(&mutex); - return result; -} - -int upoll_on_readable(struct upoll *upoll, void (*process)(void *)) -{ - assert(pollfd != 0); - assert(upoll_is_valid(upoll)); - - upoll->read = process; - return update_flags(upoll->fd); -} - -int upoll_on_writable(struct upoll *upoll, void (*process)(void *)) -{ - assert(pollfd != 0); - assert(upoll_is_valid(upoll)); - - upoll->write = process; - return update_flags(upoll->fd); -} - -void upoll_on_hangup(struct upoll *upoll, void (*process)(void *)) -{ - assert(pollfd != 0); - assert(upoll_is_valid(upoll)); - - upoll->hangup = process; -} - -void upoll_close(struct upoll *upoll) -{ - struct upoll **it; - struct upollfd *ufd; - - assert(pollfd != 0); - assert(upoll_is_valid(upoll)); - - ufd = upoll->fd; - pthread_mutex_lock(&mutex); - if (current == upoll) - current = NULL; - it = &ufd->head; - while (*it != upoll) - it = &(*it)->next; - *it = upoll->next; - pthread_mutex_unlock(&mutex); - free(upoll); - update(ufd); -} - -int upoll_wait(int timeout) -{ - int rc; - struct epoll_event e; - struct upollfd *ufd; - - if (pollfd == 0) { - errno = ECANCELED; - return -1; - } - - do { - rc = epoll_wait(pollfd, &e, 1, timeout); - } while (rc < 0 && errno == EINTR); - if (rc == 1) { - ufd = e.data.ptr; - current = ufd->head; - e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP; - while (current != NULL && e.events != 0) { - if ((e.events & EPOLLIN) && current->read) { - current->read(current->closure); - e.events &= (uint32_t)~EPOLLIN; - continue; - } - if ((e.events & EPOLLOUT) && current->write) { - current->write(current->closure); - e.events &= (uint32_t)~EPOLLOUT; - continue; - } - if ((e.events & EPOLLHUP) && current->hangup) { - current->hangup(current->closure); - if (current == NULL) - break; - } - current = current->next; - } - } - return rc < 0 ? rc : 0; -} - |