diff options
-rw-r--r-- | src/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/afb-proto-ws.c | 142 | ||||
-rw-r--r-- | src/jobs-fake.c | 150 | ||||
-rw-r--r-- | src/jobs.c | 4 |
4 files changed, 249 insertions, 50 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d531ca2e..338f1cef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -93,13 +93,14 @@ INSTALL(TARGETS afb-daemon ########################################### # build and install libafbwsc ########################################### -ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c) +ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c) SET_TARGET_PROPERTIES(afbwsc PROPERTIES VERSION ${LIBAFBWSC_VERSION} SOVERSION ${LIBAFBWSC_SOVERSION}) TARGET_LINK_LIBRARIES(afbwsc ${libsystemd_LDFLAGS} ${json-c_LDFLAGS} + -lpthread -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/export-afbwsc.map -Wl,--as-needed -Wl,--gc-sections diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 90c3b05a..ce7d75d3 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -37,6 +37,7 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" +#include "jobs.h" struct afb_proto_ws; @@ -190,6 +191,27 @@ struct afb_proto_ws void (*on_hangup)(void *closure); }; +/******************* streaming objects **********************************/ + +#define WRITEBUF_COUNT_MAX 32 +struct writebuf +{ + struct iovec iovec[WRITEBUF_COUNT_MAX]; + uint32_t uints[WRITEBUF_COUNT_MAX]; + int count; +}; + +struct readbuf +{ + char *base, *head, *end; +}; + +struct binary +{ + struct afb_proto_ws *protows; + struct readbuf rb; +}; + /******************* common useful tools **********************************/ /** @@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr) /******************* serialisation part **********************************/ -struct readbuf -{ - char *base, *head, *end; -}; - -#define WRITEBUF_COUNT_MAX 32 -struct writebuf -{ - struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; -}; - static char *readbuf_get(struct readbuf *rb, uint32_t length) { char *before = rb->head; @@ -735,54 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } /* callback when receiving binary data */ -static void client_on_binary(void *closure, char *data, size_t size) +static void client_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(protows, &rb); + client_on_reply_success(binary->protows, &binary->rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(protows, &rb); + client_on_reply_fail(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - client_on_event_broadcast(protows, &rb); + client_on_event_broadcast(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - client_on_event_create(protows, &rb); + client_on_event_create(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_DEL: /* removes the event */ - client_on_event_remove(protows, &rb); + client_on_event_remove(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - client_on_event_push(protows, &rb); + client_on_event_push(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - client_on_event_subscribe(protows, &rb); + client_on_event_subscribe(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - client_on_event_unsubscribe(protows, &rb); + client_on_event_unsubscribe(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(protows, &rb); + client_on_subcall(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIPTION: /* description */ - client_on_description(protows, &rb); + client_on_description(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +/* callback when receiving binary data */ +static void client_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, client_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } int afb_proto_ws_client_call( @@ -1017,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } /* callback when receiving binary data */ -static void server_on_binary(void *closure, char *data, size_t size) +static void server_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_CALL: - server_on_call(protows, &rb); + server_on_call(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(protows, &rb); + server_on_subcall_reply(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIBE: - server_on_describe(protows, &rb); + server_on_describe(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +static void server_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, server_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } /******************* server part: manage events **********************************/ diff --git a/src/jobs-fake.c b/src/jobs-fake.c new file mode 100644 index 00000000..3c1c2732 --- /dev/null +++ b/src/jobs-fake.c @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2016, 2017 "IoT.bzh" + * Author José Bollo <jose.bollo@iot.bzh> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> +#include <signal.h> +#include <time.h> +#include <sys/syscall.h> +#include <pthread.h> +#include <errno.h> +#include <assert.h> + +#include <systemd/sd-event.h> + +#include "jobs.h" +#include "sig-monitor.h" +#include "verbose.h" + +#include "jobs.h" + +struct jobloop; + +struct job +{ + struct job *next; + const void *group; + int timeout; + void (*callback)(int signum, void* arg); + void *closure; +}; + +static struct job *first, *last; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure) +{ + struct job *j; + + j = malloc(sizeof*j); + if (!j) { + errno = ENOMEM; + return -1; + } + + j->next = 0; + j->group = group; + j->timeout = timeout; + j->callback = callback; + j->closure = closure; + + pthread_mutex_lock(&mutex); + if (first) + last->next = j; + else + first = j; + last = j; + pthread_mutex_unlock(&mutex); + return 0; +} + +static void *thrrun(void *arg) +{ + struct job *j; + + pthread_mutex_lock(&mutex); + j = first; + if (j) + first = j->next; + pthread_mutex_unlock(&mutex); + if (j) { + j->callback(0, j->closure); + free(j); + } + return 0; +} + +int jobs_queue( + const void *group, + int timeout, + void (*callback)(int signum, void* arg), + void *arg) +{ + pthread_t tid; + int rc = add_job(group, timeout, callback, arg); + if (!rc) { + rc = pthread_create(&tid, NULL, thrrun, NULL); + if (rc) + rc = -1; + } + return rc; +} + +#if 0 +int jobs_enter( + const void *group, + int timeout, + void (*callback)(int signum, void *closure, struct jobloop *jobloop), + void *closure) +{ + return 0; +} + +int jobs_leave(struct jobloop *jobloop) +{ + return 0; +} + +int jobs_call( + const void *group, + int timeout, + void (*callback)(int, void*), + void *arg) +{ + return 0; +} + +struct sd_event *jobs_get_sd_event() +{ + struct sd_event *r; + int rc = sd_event_default(&r); + return rc < 0 ? NULL : r; +} + +void jobs_terminate() +{ +} + +int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum)) +{ + start(0); + return 0; +} +#endif @@ -363,8 +363,10 @@ static void thread_run(volatile struct thread *me) events = current_events; if (!events) events = events_get(); - else if (events->state == Locked) + else if (events->state == Locked) { events = 0; + AFB_WARNING("Loosing an event loop because reentering"); + } if (events) { /* run the events */ events->state = Locked; |