summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2018-07-05 19:12:50 +0200
committerJose Bollo <jose.bollo@iot.bzh>2018-07-06 12:40:33 +0200
commit95ad0012182b5f32cc1dd9843304b58d263a7de0 (patch)
treef9dd9571286980c882b10d6a2882c114e690b946
parent12ec841c28f8f795b49466cc377e64db3146430d (diff)
Simplify build by introducing queuing function
It is not valuable at the end to continue to have this fake job implementation. So removing it is good. Change-Id: I930ade3e3a511f0ebfb91292e5725ac3be884d44 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--coverage/bin/Makefile4
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/afb-proto-ws.c73
-rw-r--r--src/afb-proto-ws.h3
-rw-r--r--src/afb-stub-ws.c6
-rw-r--r--src/jobs-fake.c150
6 files changed, 48 insertions, 190 deletions
diff --git a/coverage/bin/Makefile b/coverage/bin/Makefile
index b851ed6c..2f2a9ee1 100644
--- a/coverage/bin/Makefile
+++ b/coverage/bin/Makefile
@@ -25,8 +25,8 @@ cflags = -I$(incdir) \
$(shell pkg-config --cflags --libs openssl libmicrohttpd json-c libsystemd uuid) \
-ldl -lrt -lpthread
-afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' )
-afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-|-fake' )
+afb_lib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' )
+afb_clib_src = $(shell ls $(srcdir)/*.c | egrep -v '/afs-|/main-' )
afb_daemon_srcs = $(srcdir)/main-afb-daemon.c $(afb_lib_src)
afb_daemon_defs = '-DAFB_VERSION="cov"' -DAGL_DEVEL -DWITH_MONITORING_OPTION '-DBINDING_INSTALL_DIR="fake"'
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c457d235..421866da 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -123,7 +123,7 @@ ENDIF()
###########################################
# build and install libafbwsc
###########################################
-ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c fdev.c fdev-systemd.c)
+ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c fdev.c fdev-systemd.c)
SET_TARGET_PROPERTIES(afbwsc PROPERTIES
VERSION ${LIBAFBWSC_VERSION}
SOVERSION ${LIBAFBWSC_SOVERSION})
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 7644a96a..142afa98 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -158,6 +158,9 @@ struct afb_proto_ws
/* on hangup callback */
void (*on_hangup)(void *closure);
+
+ /* queuing facility for processing messages */
+ int (*queuing)(void (*process)(int s, void *c), void *closure);
};
/******************* streaming objects **********************************/
@@ -325,6 +328,32 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object)
return string != NULL && writebuf_string(wb, string);
}
+/******************* queuing of messages *****************/
+
+/* queue the processing of the received message (except if size=0 cause it's not a valid message) */
+static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*))
+{
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ /* TODO process the problem */
+ errno = ENOMEM;
+ } else {
+ binary->protows = protows;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ if (!protows->queuing
+ || protows->queuing(processing, binary) < 0)
+ processing(0, binary);
+ return;
+ }
+ }
+ free(data);
+}
+
/******************* ws request part for server *****************/
void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
@@ -624,25 +653,9 @@ static void client_on_binary_job(int sig, void *closure)
/* callback when receiving binary data */
static void client_on_binary(void *closure, char *data, size_t size)
{
- int rc;
- struct binary *binary;
+ struct afb_proto_ws *protows = closure;
- if (size) {
- binary = malloc(sizeof *binary);
- if (!binary) {
- errno = ENOMEM;
- } else {
- binary->protows = closure;
- binary->rb.base = data;
- binary->rb.head = data;
- binary->rb.end = data + size;
- rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
- if (rc >= 0)
- return;
- free(binary);
- }
- }
- free(data);
+ queue_message_processing(protows, data, size, client_on_binary_job);
}
int afb_proto_ws_client_call(
@@ -870,25 +883,9 @@ static void server_on_binary_job(int sig, void *closure)
static void server_on_binary(void *closure, char *data, size_t size)
{
- int rc;
- struct binary *binary;
+ struct afb_proto_ws *protows = closure;
- if (size) {
- binary = malloc(sizeof *binary);
- if (!binary) {
- errno = ENOMEM;
- } else {
- binary->protows = closure;
- binary->rb.base = data;
- binary->rb.head = data;
- binary->rb.end = data + size;
- rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
- if (rc >= 0)
- return;
- free(binary);
- }
- }
- free(data);
+ queue_message_processing(protows, data, size, server_on_binary_job);
}
/******************* server part: manage events **********************************/
@@ -1047,3 +1044,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void
protows->on_hangup = on_hangup;
}
+void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*))
+{
+ protows->queuing = queuing;
+}
diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h
index 342313e7..b5f84e9e 100644
--- a/src/afb-proto-ws.h
+++ b/src/afb-proto-ws.h
@@ -59,8 +59,9 @@ extern int afb_proto_ws_is_client(struct afb_proto_ws *protows);
extern int afb_proto_ws_is_server(struct afb_proto_ws *protows);
extern void afb_proto_ws_hangup(struct afb_proto_ws *protows);
-extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure));
+extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure));
+extern void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*));
extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, const char *sessionid, void *request, const char *user_creds);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 974ea07e..0a58b836 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -597,6 +597,11 @@ static void on_hangup(void *closure)
afb_stub_ws_unref(stubws);
}
+static int enqueue_processing(void (*callback)(int signum, void* arg), void *arg)
+{
+ return jobs_queue(NULL, 0, callback, arg);
+}
+
/*****************************************************/
static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset, int client)
@@ -617,6 +622,7 @@ static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *api
stubws->apiset = afb_apiset_addref(apiset);
stubws->refcount = 1;
afb_proto_ws_on_hangup(stubws->proto, on_hangup);
+ afb_proto_ws_set_queuing(stubws->proto, enqueue_processing);
return stubws;
}
free(stubws);
diff --git a/src/jobs-fake.c b/src/jobs-fake.c
deleted file mode 100644
index d3cd19e1..00000000
--- a/src/jobs-fake.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
- * Author José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-
-#include <stdlib.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <time.h>
-#include <sys/syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <assert.h>
-
-#include <systemd/sd-event.h>
-
-#include "jobs.h"
-#include "sig-monitor.h"
-#include "verbose.h"
-
-#include "jobs.h"
-
-struct jobloop;
-
-struct job
-{
- struct job *next;
- const void *group;
- int timeout;
- void (*callback)(int signum, void* arg);
- void *closure;
-};
-
-static struct job *first, *last;
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure)
-{
- struct job *j;
-
- j = malloc(sizeof*j);
- if (!j) {
- errno = ENOMEM;
- return -1;
- }
-
- j->next = 0;
- j->group = group;
- j->timeout = timeout;
- j->callback = callback;
- j->closure = closure;
-
- pthread_mutex_lock(&mutex);
- if (first)
- last->next = j;
- else
- first = j;
- last = j;
- pthread_mutex_unlock(&mutex);
- return 0;
-}
-
-static void *thrrun(void *arg)
-{
- struct job *j;
-
- pthread_mutex_lock(&mutex);
- j = first;
- if (j)
- first = j->next;
- pthread_mutex_unlock(&mutex);
- if (j) {
- j->callback(0, j->closure);
- free(j);
- }
- return 0;
-}
-
-int jobs_queue(
- const void *group,
- int timeout,
- void (*callback)(int signum, void* arg),
- void *arg)
-{
- pthread_t tid;
- int rc = add_job(group, timeout, callback, arg);
- if (!rc) {
- rc = pthread_create(&tid, NULL, thrrun, NULL);
- if (rc)
- rc = -1;
- }
- return rc;
-}
-
-#if 0
-int jobs_enter(
- const void *group,
- int timeout,
- void (*callback)(int signum, void *closure, struct jobloop *jobloop),
- void *closure)
-{
- return 0;
-}
-
-int jobs_leave(struct jobloop *jobloop)
-{
- return 0;
-}
-
-int jobs_call(
- const void *group,
- int timeout,
- void (*callback)(int, void*),
- void *arg)
-{
- return 0;
-}
-
-struct sd_event *jobs_get_sd_event()
-{
- struct sd_event *r;
- int rc = sd_event_default(&r);
- return rc < 0 ? NULL : r;
-}
-
-void jobs_terminate()
-{
-}
-
-int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
-{
- start(0);
- return 0;
-}
-#endif