diff options
-rw-r--r-- | coverage/bin/Makefile | 4 | ||||
-rw-r--r-- | src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/afb-proto-ws.c | 73 | ||||
-rw-r--r-- | src/afb-proto-ws.h | 3 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 6 | ||||
-rw-r--r-- | src/jobs-fake.c | 150 |
6 files changed, 48 insertions, 190 deletions
diff --git a/coverage/bin/Makefile b/coverage/bin/Makefile index b851ed6c..2f2a9ee1 100644 --- a/coverage/bin/Makefile +++ b/coverage/bin/Makefile @@ -25,8 +25,8 @@ cflags = -I$(incdir) \ $(shell pkg-config --cflags --libs openssl libmicrohttpd json-c libsystemd uuid) \ -ldl -lrt -lpthread -afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' ) -afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' ) +afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' ) +afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' ) afb_daemon_srcs = $(srcdir)/main-afb-daemon.c $(afb_lib_src) afb_daemon_defs = '-DAFB_VERSION="cov"' -DAGL_DEVEL -DWITH_MONITORING_OPTION '-DBINDING_INSTALL_DIR="fake"' diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c457d235..421866da 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -123,7 +123,7 @@ ENDIF() ########################################### # build and install libafbwsc ########################################### -ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c fdev.c fdev-systemd.c) +ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c fdev.c fdev-systemd.c) SET_TARGET_PROPERTIES(afbwsc PROPERTIES VERSION ${LIBAFBWSC_VERSION} SOVERSION ${LIBAFBWSC_SOVERSION}) diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 7644a96a..142afa98 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -158,6 +158,9 @@ struct afb_proto_ws /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(void (*process)(int s, void *c), void *closure); }; /******************* streaming objects **********************************/ @@ -325,6 +328,32 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object) return string != NULL && writebuf_string(wb, string); } +/******************* queuing of messages *****************/ + +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) +{ + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -624,25 +653,9 @@ static void client_on_binary_job(int sig, void *closure) /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, client_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, client_on_binary_job); } int afb_proto_ws_client_call( @@ -870,25 +883,9 @@ static void server_on_binary_job(int sig, void *closure) static void server_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, server_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ @@ -1047,3 +1044,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +} diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h index 342313e7..b5f84e9e 100644 --- a/src/afb-proto-ws.h +++ b/src/afb-proto-ws.h @@ -59,8 +59,9 @@ extern int afb_proto_ws_is_client(struct afb_proto_ws *protows); extern int afb_proto_ws_is_server(struct afb_proto_ws *protows); extern void afb_proto_ws_hangup(struct afb_proto_ws *protows); -extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure)); +extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure)); +extern void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*)); extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, const char *sessionid, void *request, const char *user_creds); diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 974ea07e..0a58b836 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -597,6 +597,11 @@ static void on_hangup(void *closure) afb_stub_ws_unref(stubws); } +static int enqueue_processing(void (*callback)(int signum, void* arg), void *arg) +{ + return jobs_queue(NULL, 0, callback, arg); +} + /*****************************************************/ static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset, int client) @@ -617,6 +622,7 @@ static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *api stubws->apiset = afb_apiset_addref(apiset); stubws->refcount = 1; afb_proto_ws_on_hangup(stubws->proto, on_hangup); + afb_proto_ws_set_queuing(stubws->proto, enqueue_processing); return stubws; } free(stubws); diff --git a/src/jobs-fake.c b/src/jobs-fake.c deleted file mode 100644 index d3cd19e1..00000000 --- a/src/jobs-fake.c +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (C) 2016, 2017, 2018 "IoT.bzh" - * Author José Bollo <jose.bollo@iot.bzh> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define _GNU_SOURCE - -#include <stdlib.h> -#include <stdint.h> -#include <unistd.h> -#include <signal.h> -#include <time.h> -#include <sys/syscall.h> -#include <pthread.h> -#include <errno.h> -#include <assert.h> - -#include <systemd/sd-event.h> - -#include "jobs.h" -#include "sig-monitor.h" -#include "verbose.h" - -#include "jobs.h" - -struct jobloop; - -struct job -{ - struct job *next; - const void *group; - int timeout; - void (*callback)(int signum, void* arg); - void *closure; -}; - -static struct job *first, *last; -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - -static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure) -{ - struct job *j; - - j = malloc(sizeof*j); - if (!j) { - errno = ENOMEM; - return -1; - } - - j->next = 0; - j->group = group; - j->timeout = timeout; - j->callback = callback; - j->closure = closure; - - pthread_mutex_lock(&mutex); - if (first) - last->next = j; - else - first = j; - last = j; - pthread_mutex_unlock(&mutex); - return 0; -} - -static void *thrrun(void *arg) -{ - struct job *j; - - pthread_mutex_lock(&mutex); - j = first; - if (j) - first = j->next; - pthread_mutex_unlock(&mutex); - if (j) { - j->callback(0, j->closure); - free(j); - } - return 0; -} - -int jobs_queue( - const void *group, - int timeout, - void (*callback)(int signum, void* arg), - void *arg) -{ - pthread_t tid; - int rc = add_job(group, timeout, callback, arg); - if (!rc) { - rc = pthread_create(&tid, NULL, thrrun, NULL); - if (rc) - rc = -1; - } - return rc; -} - -#if 0 -int jobs_enter( - const void *group, - int timeout, - void (*callback)(int signum, void *closure, struct jobloop *jobloop), - void *closure) -{ - return 0; -} - -int jobs_leave(struct jobloop *jobloop) -{ - return 0; -} - -int jobs_call( - const void *group, - int timeout, - void (*callback)(int, void*), - void *arg) -{ - return 0; -} - -struct sd_event *jobs_get_sd_event() -{ - struct sd_event *r; - int rc = sd_event_default(&r); - return rc < 0 ? NULL : r; -} - -void jobs_terminate() -{ -} - -int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum)) -{ - start(0); - return 0; -} -#endif |