summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--coverage/bin/Makefile4
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/afb-proto-ws.c73
-rw-r--r--src/afb-proto-ws.h3
-rw-r--r--src/afb-stub-ws.c6
-rw-r--r--src/jobs-fake.c150
6 files changed, 48 insertions, 190 deletions
diff --git a/coverage/bin/Makefile b/coverage/bin/Makefile
index b851ed6c..2f2a9ee1 100644
--- a/coverage/bin/Makefile
+++ b/coverage/bin/Makefile
@@ -25,8 +25,8 @@ cflags = -I$(incdir) \
$(shell pkg-config --cflags --libs openssl libmicrohttpd json-c libsystemd uuid) \
-ldl -lrt -lpthread
-afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' )
-afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' )
+afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' )
+afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' )
afb_daemon_srcs = $(srcdir)/main-afb-daemon.c $(afb_lib_src)
afb_daemon_defs = '-DAFB_VERSION="cov"' -DAGL_DEVEL -DWITH_MONITORING_OPTION '-DBINDING_INSTALL_DIR="fake"'
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c457d235..421866da 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -123,7 +123,7 @@ ENDIF()
###########################################
# build and install libafbwsc
###########################################
-ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c fdev.c fdev-systemd.c)
+ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c fdev.c fdev-systemd.c)
SET_TARGET_PROPERTIES(afbwsc PROPERTIES
VERSION ${LIBAFBWSC_VERSION}
SOVERSION ${LIBAFBWSC_SOVERSION})
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 7644a96a..142afa98 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -158,6 +158,9 @@ struct afb_proto_ws
/* on hangup callback */
void (*on_hangup)(void *closure);
+
+ /* queuing facility for processing messages */
+ int (*queuing)(void (*process)(int s, void *c), void *closure);
};
/******************* streaming objects **********************************/
@@ -325,6 +328,32 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object)
return string != NULL && writebuf_string(wb, string);
}
+/******************* queuing of messages *****************/
+
+/* queue the processing of the received message (except if size=0 cause it's not a valid message) */
+static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*))
+{
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ /* TODO process the problem */
+ errno = ENOMEM;
+ } else {
+ binary->protows = protows;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ if (!protows->queuing
+ || protows->queuing(processing, binary) < 0)
+ processing(0, binary);
+ return;
+ }
+ }
+ free(data);
+}
+
/******************* ws request part for server *****************/
void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
@@ -624,25 +653,9 @@ static void client_on_binary_job(int sig, void *closure)
/* callback when receiving binary data */
static void client_on_binary(void *closure, char *data, size_t size)
{
- int rc;
- struct binary *binary;
+ struct afb_proto_ws *protows = closure;
- if (size) {
- binary = malloc(sizeof *binary);
- if (!binary) {
- errno = ENOMEM;
- } else {
- binary->protows = closure;
- binary->rb.base = data;
- binary->rb.head = data;
- binary->rb.end = data + size;
- rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
- if (rc >= 0)
- return;
- free(binary);
- }
- }
- free(data);
+ queue_message_processing(protows, data, size, client_on_binary_job);
}
int afb_proto_ws_client_call(
@@ -870,25 +883,9 @@ static void server_on_binary_job(int sig, void *closure)
static void server_on_binary(void *closure, char *data, size_t size)
{
- int rc;
- struct binary *binary;
+ struct afb_proto_ws *protows = closure;
- if (size) {
- binary = malloc(sizeof *binary);
- if (!binary) {
- errno = ENOMEM;
- } else {
- binary->protows = closure;
- binary->rb.base = data;
- binary->rb.head = data;
- binary->rb.end = data + size;
- rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
- if (rc >= 0)
- return;
- free(binary);
- }
- }
- free(data);
+ queue_message_processing(protows, data, size, server_on_binary_job);
}
/******************* server part: manage events **********************************/
@@ -1047,3 +1044,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void
protows->on_hangup = on_hangup;
}
+void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*))
+{
+ protows->queuing = queuing;
+}
diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h
index 342313e7..b5f84e9e 100644
--- a/src/afb-proto-ws.h
+++ b/src/afb-proto-ws.h
@@ -59,8 +59,9 @@ extern int afb_proto_ws_is_client(struct afb_proto_ws *protows);
extern int afb_proto_ws_is_server(struct afb_proto_ws *protows);
extern void afb_proto_ws_hangup(struct afb_proto_ws *protows);
-extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure));
+extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure));
+extern void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*));
extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, const char *sessionid, void *request, const char *user_creds);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 974ea07e..0a58b836 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -597,6 +597,11 @@ static void on_hangup(void *closure)
afb_stub_ws_unref(stubws);
}
+static int enqueue_processing(void (*callback)(int signum, void* arg), void *arg)
+{
+ return jobs_queue(NULL, 0, callback, arg);
+}
+
/*****************************************************/
static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset, int client)
@@ -617,6 +622,7 @@ static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *api
stubws->apiset = afb_apiset_addref(apiset);
stubws->refcount = 1;
afb_proto_ws_on_hangup(stubws->proto, on_hangup);
+ afb_proto_ws_set_queuing(stubws->proto, enqueue_processing);
return stubws;
}
free(stubws);
diff --git a/src/jobs-fake.c b/src/jobs-fake.c
deleted file mode 100644
index d3cd19e1..00000000
--- a/src/jobs-fake.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
- * Author José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-
-#include <stdlib.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <time.h>
-#include <sys/syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <assert.h>
-
-#include <systemd/sd-event.h>
-
-#include "jobs.h"
-#include "sig-monitor.h"
-#include "verbose.h"
-
-#include "jobs.h"
-
-struct jobloop;
-
-struct job
-{
- struct job *next;
- const void *group;
- int timeout;
- void (*callback)(int signum, void* arg);
- void *closure;
-};
-
-static struct job *first, *last;
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure)
-{
- struct job *j;
-
- j = malloc(sizeof*j);
- if (!j) {
- errno = ENOMEM;
- return -1;
- }
-
- j->next = 0;
- j->group = group;
- j->timeout = timeout;
- j->callback = callback;
- j->closure = closure;
-
- pthread_mutex_lock(&mutex);
- if (first)
- last->next = j;
- else
- first = j;
- last = j;
- pthread_mutex_unlock(&mutex);
- return 0;
-}
-
-static void *thrrun(void *arg)
-{
- struct job *j;
-
- pthread_mutex_lock(&mutex);
- j = first;
- if (j)
- first = j->next;
- pthread_mutex_unlock(&mutex);
- if (j) {
- j->callback(0, j->closure);
- free(j);
- }
- return 0;
-}
-
-int jobs_queue(
- const void *group,
- int timeout,
- void (*callback)(int signum, void* arg),
- void *arg)
-{
- pthread_t tid;
- int rc = add_job(group, timeout, callback, arg);
- if (!rc) {
- rc = pthread_create(&tid, NULL, thrrun, NULL);
- if (rc)
- rc = -1;
- }
- return rc;
-}
-
-#if 0
-int jobs_enter(
- const void *group,
- int timeout,
- void (*callback)(int signum, void *closure, struct jobloop *jobloop),
- void *closure)
-{
- return 0;
-}
-
-int jobs_leave(struct jobloop *jobloop)
-{
- return 0;
-}
-
-int jobs_call(
- const void *group,
- int timeout,
- void (*callback)(int, void*),
- void *arg)
-{
- return 0;
-}
-
-struct sd_event *jobs_get_sd_event()
-{
- struct sd_event *r;
- int rc = sd_event_default(&r);
- return rc < 0 ? NULL : r;
-}
-
-void jobs_terminate()
-{
-}
-
-int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
-{
- start(0);
- return 0;
-}
-#endif