aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt45
-rw-r--r--bindings/CMakeLists.txt3
-rw-r--r--bindings/samples/ave.c2
-rw-r--r--include/afb/afb-daemon-itf.h2
-rw-r--r--include/afb/afb-daemon-v1.h3
-rw-r--r--include/afb/afb-daemon-v2.h3
-rw-r--r--include/afb/afb-dynapi-itf.h1
-rw-r--r--include/afb/afb-dynapi.h3
-rw-r--r--src/CMakeLists.txt12
-rw-r--r--src/afb-api-dyn.c4
-rw-r--r--src/afb-api-dyn.h1
-rw-r--r--src/afb-context.c2
-rw-r--r--src/afb-evt.h1
-rw-r--r--src/afb-export.c12
-rw-r--r--src/afb-proto-ws.c254
-rw-r--r--src/afb-session.c536
-rw-r--r--src/afb-session.h12
-rw-r--r--src/afb-stub-ws.c58
-rw-r--r--src/afb-trace.c122
-rw-r--r--src/jobs-fake.c150
-rw-r--r--src/jobs.c124
-rw-r--r--src/main.c2
-rw-r--r--src/verbose.c5
-rw-r--r--test/AFB.js22
-rw-r--r--test/monitoring/AFB.js22
-rw-r--r--test/monitoring/monitor-pastel.css2
-rw-r--r--test/monitoring/monitor.html2
-rw-r--r--test/monitoring/monitor.js2
28 files changed, 912 insertions, 495 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 163a74b6..568eb42b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -70,19 +70,14 @@ CHECK_LIBRARY_EXISTS(magic magic_load "" HAVE_LIBMAGIC_SO)
IF(HAVE_MAGIC_H)
IF(HAVE_LIBMAGIC_SO)
SET(HAVE_LIBMAGIC "1")
+ SET(LIBMAGIC_LDFLAGS -lmagic)
ENDIF(HAVE_LIBMAGIC_SO)
ENDIF(HAVE_MAGIC_H)
-IF(NOT HAVE_LIBMAGIC)
- MESSAGE(FATAL_ERROR "\"magic.h\" or \"libmagic.so\" missing.
- Please install the \"file-devel\" or \"libmagic-dev\" package !")
-ENDIF(NOT HAVE_LIBMAGIC)
-ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE)
-
-PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd>=222)
-PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.55)
-PKG_CHECK_MODULES(openssl REQUIRED openssl)
-PKG_CHECK_MODULES(uuid REQUIRED uuid)
+PKG_CHECK_MODULES(libsystemd libsystemd>=222)
+PKG_CHECK_MODULES(libmicrohttpd libmicrohttpd>=0.9.55)
+PKG_CHECK_MODULES(openssl openssl)
+PKG_CHECK_MODULES(uuid uuid)
PKG_CHECK_MODULES(cynara cynara-client)
IF(AGL_DEVEL)
@@ -93,6 +88,32 @@ IF(cynara_FOUND)
ADD_DEFINITIONS(-DBACKEND_PERMISSION_IS_CYNARA)
ENDIF(cynara_FOUND)
+IF(HAVE_LIBMAGIC AND libsystemd_FOUND AND libmicrohttpd_FOUND AND openssl_FOUND AND uuid_FOUND)
+ SET(WITH_BINDER TRUE)
+ ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE)
+ELSE()
+ IF(NOT HAVE_LIBMAGIC)
+ MESSAGE(WARNING "\"magic.h\" or \"libmagic.so\" missing.
+ Please install the \"file-devel\" or \"libmagic-dev\" package !")
+ ENDIF(NOT HAVE_LIBMAGIC)
+ IF(NOT libsystemd_FOUND)
+ MESSAGE(WARNING "Dependency to 'libsystemd' is missing")
+ ENDIF()
+ IF(NOT libmicrohttpd_FOUND)
+ MESSAGE(WARNING "Dependency to 'libmicrohttpd' is missing")
+ ENDIF()
+ IF(NOT openssl_FOUND)
+ MESSAGE(WARNING "Dependency to 'openssl' is missing")
+ ENDIF()
+ IF(NOT uuid_FOUND)
+ MESSAGE(WARNING "Dependency to 'uuid' is missing")
+ ENDIF()
+ IF(NOT ALLOW_NO_BINDER)
+ MESSAGE(FATAL_ERROR "Can't compile the binder, either define ALLOW_NO_BINDER or install dependencies")
+ ENDIF()
+ SET(WITH_BINDER FALSE)
+ENDIF()
+
ADD_DEFINITIONS(-DAFB_VERSION="${PROJECT_VERSION}")
INCLUDE_DIRECTORIES(
@@ -114,7 +135,7 @@ SET(link_libraries
${uuid_LDFLAGS}
${openssl_LDFLAGS}
${cynara_LDFLAGS}
- -lmagic
+ ${LIBMAGIC_LDFLAGS}
-ldl
-lrt
)
@@ -123,7 +144,7 @@ SET(binding_install_dir ${CMAKE_INSTALL_FULL_LIBDIR}/afb)
###########################################################################
# activates the monitoring by default
-if(INCLUDE_MONITORING)
+if(INCLUDE_MONITORING AND WITH_BINDER)
add_definitions(-DWITH_MONITORING_OPTION)
INSTALL(DIRECTORY
${CMAKE_CURRENT_SOURCE_DIR}/test/monitoring
diff --git a/bindings/CMakeLists.txt b/bindings/CMakeLists.txt
index 1a1e9016..509c6cbf 100644
--- a/bindings/CMakeLists.txt
+++ b/bindings/CMakeLists.txt
@@ -16,6 +16,9 @@
# limitations under the License.
###########################################################################
+IF(WITH_BINDER)
ADD_SUBDIRECTORY(intrinsics)
ADD_SUBDIRECTORY(samples)
ADD_SUBDIRECTORY(tutorial)
+ENDIF(WITH_BINDER)
+
diff --git a/bindings/samples/ave.c b/bindings/samples/ave.c
index a4b4144d..569245eb 100644
--- a/bindings/samples/ave.c
+++ b/bindings/samples/ave.c
@@ -483,7 +483,7 @@ int afbBindingVdyn(afb_dynapi *dynapi)
int i, rc;
for (i = 0; apis[i] ; i++) {
- rc = afb_dynapi_new_api(dynapi, apis[i], NULL, build_api, (void*)apis[i]);
+ rc = afb_dynapi_new_api(dynapi, apis[i], NULL, 0, build_api, (void*)apis[i]);
if (rc < 0)
AFB_DYNAPI_ERROR(dynapi, "can't create API %s", apis[i]);
}
diff --git a/include/afb/afb-daemon-itf.h b/include/afb/afb-daemon-itf.h
index b78f9af9..492032ee 100644
--- a/include/afb/afb-daemon-itf.h
+++ b/include/afb/afb-daemon-itf.h
@@ -44,7 +44,7 @@ struct afb_daemon_itf
struct afb_req (*unstore_req)(void*closure, struct afb_stored_req *sreq);
int (*require_api)(void*closure, const char *name, int initialized);
int (*rename_api)(void*closure, const char *name);
- int (*new_api)(void *closure, const char *api, const char *info, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure);
+ int (*new_api)(void *closure, const char *api, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure);
};
/*
diff --git a/include/afb/afb-daemon-v1.h b/include/afb/afb-daemon-v1.h
index d1a0cc2a..d199a486 100644
--- a/include/afb/afb-daemon-v1.h
+++ b/include/afb/afb-daemon-v1.h
@@ -195,8 +195,9 @@ static inline int afb_daemon_new_api_v1(
struct afb_daemon daemon,
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *closure)
{
- return daemon.itf->new_api(daemon.closure, api, info, preinit, closure);
+ return daemon.itf->new_api(daemon.closure, api, info, noconcurrency, preinit, closure);
}
diff --git a/include/afb/afb-daemon-v2.h b/include/afb/afb-daemon-v2.h
index 1ea40e96..6eb48c60 100644
--- a/include/afb/afb-daemon-v2.h
+++ b/include/afb/afb-daemon-v2.h
@@ -171,9 +171,10 @@ static inline int afb_daemon_rename_api_v2(const char *name)
static inline int afb_daemon_new_api_v2(
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *closure)
{
- return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, preinit, closure);
+ return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, noconcurrency, preinit, closure);
}
diff --git a/include/afb/afb-dynapi-itf.h b/include/afb/afb-dynapi-itf.h
index 682558e0..fc90dbde 100644
--- a/include/afb/afb-dynapi-itf.h
+++ b/include/afb/afb-dynapi-itf.h
@@ -131,6 +131,7 @@ struct afb_dynapi_itf
void *dynapi,
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *closure);
diff --git a/include/afb/afb-dynapi.h b/include/afb/afb-dynapi.h
index dfdcdb24..e2458952 100644
--- a/include/afb/afb-dynapi.h
+++ b/include/afb/afb-dynapi.h
@@ -242,10 +242,11 @@ static inline int afb_dynapi_new_api(
struct afb_dynapi *dynapi,
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *closure)
{
- return dynapi->itf->api_new_api(dynapi, api, info, preinit, closure);
+ return dynapi->itf->api_new_api(dynapi, api, info, noconcurrency, preinit, closure);
}
static inline int afb_dynapi_set_verbs_v2(
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a7d1ff95..b8accc77 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -25,6 +25,10 @@ endif(ALLOW_NO_BINDER)
INCLUDE(FindPkgConfig)
ADD_SUBDIRECTORY(genskel)
+
+IF(WITH_BINDER)
+###########################################
+
ADD_SUBDIRECTORY(tests)
ADD_DEFINITIONS(-DBINDING_INSTALL_DIR="${binding_install_dir}")
@@ -91,13 +95,14 @@ INSTALL(TARGETS afb-daemon
###########################################
# build and install libafbwsc
###########################################
-ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c)
+ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c)
SET_TARGET_PROPERTIES(afbwsc PROPERTIES
VERSION ${LIBAFBWSC_VERSION}
SOVERSION ${LIBAFBWSC_SOVERSION})
TARGET_LINK_LIBRARIES(afbwsc
${libsystemd_LDFLAGS}
${json-c_LDFLAGS}
+ -lpthread
-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/export-afbwsc.map
-Wl,--as-needed
-Wl,--gc-sections
@@ -117,4 +122,7 @@ TARGET_LINK_LIBRARIES(afb-client-demo
INSTALL(TARGETS afb-client-demo
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
-
+###########################################
+ELSE(WITH_BINDER)
+ MESSAGE(WARNING "NOT compiling the binder! but tools are compiled")
+ENDIF(WITH_BINDER)
diff --git a/src/afb-api-dyn.c b/src/afb-api-dyn.c
index 0667f519..2ef1b1a5 100644
--- a/src/afb-api-dyn.c
+++ b/src/afb-api-dyn.c
@@ -235,7 +235,7 @@ static struct afb_api_itf dyn_api_itf = {
.describe = describe_cb
};
-int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int (*preinit)(void*, struct afb_dynapi*), void *closure)
+int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi*), void *closure)
{
int rc;
struct afb_api_dyn *dynapi;
@@ -266,7 +266,7 @@ int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *inf
/* records the binding */
afb_api.closure = dynapi;
afb_api.itf = &dyn_api_itf;
- afb_api.group = NULL;
+ afb_api.group = noconcurrency ? dynapi : NULL;
if (afb_apiset_add(apiset, afb_export_apiname(dynapi->export), afb_api) < 0) {
ERROR("dynamic api %s can't be registered to set %s, ABORTING it!",
afb_export_apiname(dynapi->export),
diff --git a/src/afb-api-dyn.h b/src/afb-api-dyn.h
index 35464c6e..21626eed 100644
--- a/src/afb-api-dyn.h
+++ b/src/afb-api-dyn.h
@@ -40,6 +40,7 @@ extern int afb_api_dyn_add(
struct afb_apiset *apiset,
const char *name,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi*),
void *closure);
diff --git a/src/afb-context.c b/src/afb-context.c
index 759ee90e..c2649a48 100644
--- a/src/afb-context.c
+++ b/src/afb-context.c
@@ -63,7 +63,7 @@ int afb_context_connect(struct afb_context *context, const char *uuid, const cha
int created;
struct afb_session *session;
- session = afb_session_get (uuid, &created);
+ session = afb_session_get (uuid, AFB_SESSION_TIMEOUT_DEFAULT, &created);
if (session == NULL)
return -1;
init_context(context, session, token);
diff --git a/src/afb-evt.h b/src/afb-evt.h
index 2d888fa1..901bfbb2 100644
--- a/src/afb-evt.h
+++ b/src/afb-evt.h
@@ -18,6 +18,7 @@
#pragma once
struct afb_event;
+struct afb_eventid;
struct afb_evtid;
struct afb_session;
struct json_object;
diff --git a/src/afb-export.c b/src/afb-export.c
index 19aab0ce..304395ae 100644
--- a/src/afb-export.c
+++ b/src/afb-export.c
@@ -179,7 +179,7 @@ static struct afb_eventid *eventid_make_cb(void *closure, const char *name)
static struct afb_event event_make_cb(void *closure, const char *name)
{
struct afb_eventid *eventid = eventid_make_cb(closure, name);
- return (struct afb_event){ .itf = eventid ? eventid->itf : NULL, .closure = eventid };
+ return afb_evt_event_from_evtid(afb_evt_eventid_to_evtid(eventid));
}
static int event_broadcast_cb(void *closure, const char *name, struct json_object *object)
@@ -255,11 +255,12 @@ static int api_new_api_cb(
void *closure,
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *preinit_closure)
{
struct afb_export *export = closure;
- return afb_api_dyn_add(export->apiset, api, info, preinit, preinit_closure);
+ return afb_api_dyn_add(export->apiset, api, info, noconcurrency, preinit, preinit_closure);
}
/**********************************************
@@ -376,11 +377,12 @@ static int hooked_api_new_api_cb(
void *closure,
const char *api,
const char *info,
+ int noconcurrency,
int (*preinit)(void*, struct afb_dynapi *),
void *preinit_closure)
{
/* TODO */
- return api_new_api_cb(closure, api, info, preinit, preinit_closure);
+ return api_new_api_cb(closure, api, info, noconcurrency, preinit, preinit_closure);
}
/**********************************************
* vectors
@@ -1070,7 +1072,7 @@ static struct afb_export *create(struct afb_apiset *apiset, const char *apiname,
/* session shared with other exports */
if (common_session == NULL) {
- common_session = afb_session_create (NULL, 0);
+ common_session = afb_session_create (0);
if (common_session == NULL)
return NULL;
}
@@ -1179,7 +1181,7 @@ struct afb_binding_interface_v1 *afb_export_get_interface_v1(struct afb_export *
int afb_export_unshare_session(struct afb_export *export)
{
if (export->session == common_session) {
- export->session = afb_session_create (NULL, 0);
+ export->session = afb_session_create (0);
if (export->session)
afb_session_unref(common_session);
else {
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 3c8922cd..ce7d75d3 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -37,6 +37,7 @@
#include "afb-ws.h"
#include "afb-msg-json.h"
#include "afb-proto-ws.h"
+#include "jobs.h"
struct afb_proto_ws;
@@ -190,6 +191,27 @@ struct afb_proto_ws
void (*on_hangup)(void *closure);
};
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX 32
+struct writebuf
+{
+ struct iovec iovec[WRITEBUF_COUNT_MAX];
+ uint32_t uints[WRITEBUF_COUNT_MAX];
+ int count;
+};
+
+struct readbuf
+{
+ char *base, *head, *end;
+};
+
+struct binary
+{
+ struct afb_proto_ws *protows;
+ struct readbuf rb;
+};
+
/******************* common useful tools **********************************/
/**
@@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr)
/******************* serialisation part **********************************/
-struct readbuf
-{
- char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX 32
-struct writebuf
-{
- struct iovec iovec[WRITEBUF_COUNT_MAX];
- uint32_t uints[WRITEBUF_COUNT_MAX];
- int count;
-};
-
static char *readbuf_get(struct readbuf *rb, uint32_t length)
{
char *before = rb->head;
@@ -343,12 +352,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_string(&wb, info ?: "")
&& writebuf_object(&wb, obj)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -362,12 +374,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_string(&wb, status)
&& writebuf_string(&wb, info ? : "")) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -391,7 +406,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
sc->callback = callback;
sc->closure = cb_closure;
- pthread_mutex_unlock(&protows->mutex);
+ pthread_mutex_lock(&protows->mutex);
sc->subcallid = ptr2id(sc);
do {
sc->subcallid++;
@@ -409,7 +424,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
&& writebuf_string(&wb, api)
&& writebuf_string(&wb, verb)
&& writebuf_object(&wb, args)) {
+ pthread_mutex_lock(&protows->mutex);
rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -424,12 +441,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_uint32(&wb, (uint32_t)event_id)
&& writebuf_string(&wb, event_name)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -443,12 +463,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_uint32(&wb, (uint32_t)event_id)
&& writebuf_string(&wb, event_name)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -461,7 +484,7 @@ success:
/******************* client part **********************************/
/* search a memorized call */
-static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
{
struct client_call *call;
@@ -472,11 +495,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint
return call;
}
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+{
+ struct client_call *result;
+
+ pthread_mutex_lock(&protows->mutex);
+ result = client_call_search_locked(protows, callid);
+ pthread_mutex_unlock(&protows->mutex);
+ return result;
+}
+
/* free and release the memorizing call */
static void client_call_destroy(struct client_call *call)
{
struct client_call **prv;
+ struct afb_proto_ws *protows = call->protows;
+ pthread_mutex_lock(&protows->mutex);
prv = &call->protows->calls;
while (*prv != NULL) {
if (*prv == call) {
@@ -485,6 +520,7 @@ static void client_call_destroy(struct client_call *call)
}
prv = &(*prv)->next;
}
+ pthread_mutex_unlock(&protows->mutex);
free(call);
}
@@ -505,7 +541,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb,
}
/* get the call */
- *call = client_call_search(protows, callid);
+ *call = client_call_search_unlocked(protows, callid);
if (*call == NULL) {
return 0;
}
@@ -600,6 +636,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r
if (!client_msg_call_get(protows, rb, &call))
return;
+
if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
@@ -614,12 +651,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc
{
struct writebuf wb = { .count = 0 };
char ie = status < 0;
+ int rc;
- return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+ if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
&& writebuf_uint32(&wb, subcallid)
&& writebuf_char(&wb, ie)
- && writebuf_object(&wb, object)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ && writebuf_object(&wb, object)) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
/* callback for subcall reply */
@@ -682,11 +726,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
struct json_object *object;
if (readbuf_uint32(rb, &descid)) {
+ pthread_mutex_lock(&protows->mutex);
prv = &protows->describes;
while ((desc = *prv) && desc->descid != descid)
prv = &desc->next;
- if (desc) {
+ if (!desc)
+ pthread_mutex_unlock(&protows->mutex);
+ else {
*prv = desc->next;
+ pthread_mutex_unlock(&protows->mutex);
if (!readbuf_object(rb, &object))
object = NULL;
desc->callback(desc->closure, object);
@@ -696,56 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
}
/* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
{
- struct afb_proto_ws *protows;
- struct readbuf rb;
-
- rb.base = data;
- if (size > 0) {
- rb.head = data;
- rb.end = data + size;
- protows = closure;
+ struct binary *binary = closure;
- pthread_mutex_lock(&protows->mutex);
- switch (*rb.head++) {
+ if (!sig) {
+ switch (*binary->rb.head++) {
case CHAR_FOR_ANSWER_SUCCESS: /* success */
- client_on_reply_success(protows, &rb);
+ client_on_reply_success(binary->protows, &binary->rb);
break;
case CHAR_FOR_ANSWER_FAIL: /* fail */
- client_on_reply_fail(protows, &rb);
+ client_on_reply_fail(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- client_on_event_broadcast(protows, &rb);
+ client_on_event_broadcast(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_ADD: /* creates the event */
- client_on_event_create(protows, &rb);
+ client_on_event_create(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_DEL: /* removes the event */
- client_on_event_remove(protows, &rb);
+ client_on_event_remove(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_PUSH: /* pushs the event */
- client_on_event_push(protows, &rb);
+ client_on_event_push(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- client_on_event_subscribe(protows, &rb);
+ client_on_event_subscribe(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- client_on_event_unsubscribe(protows, &rb);
+ client_on_event_unsubscribe(binary->protows, &binary->rb);
break;
case CHAR_FOR_SUBCALL_CALL: /* subcall */
- client_on_subcall(protows, &rb);
+ client_on_subcall(binary->protows, &binary->rb);
break;
case CHAR_FOR_DESCRIPTION: /* description */
- client_on_description(protows, &rb);
+ client_on_description(binary->protows, &binary->rb);
break;
default: /* unexpected message */
/* TODO: close the connection */
break;
}
- pthread_mutex_unlock(&protows->mutex);
}
- free(rb.base);
+ free(binary->rb.base);
+ free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+ int rc;
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ errno = ENOMEM;
+ } else {
+ binary->protows = closure;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+ if (rc >= 0)
+ return;
+ free(binary);
+ }
+ }
+ free(data);
}
int afb_proto_ws_client_call(
@@ -771,11 +836,12 @@ int afb_proto_ws_client_call(
/* init call data */
pthread_mutex_lock(&protows->mutex);
call->callid = ptr2id(call);
- while(client_call_search(protows, call->callid) != NULL)
+ while(client_call_search_locked(protows, call->callid) != NULL)
call->callid++;
call->protows = protows;
call->next = protows->calls;
protows->calls = call;
+ pthread_mutex_unlock(&protows->mutex);
/* creates the call message */
if (!writebuf_char(&wb, CHAR_FOR_CALL)
@@ -788,7 +854,9 @@ int afb_proto_ws_client_call(
}
/* send */
+ pthread_mutex_lock(&protows->mutex);
rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto end;
@@ -797,7 +865,6 @@ int afb_proto_ws_client_call(
clean:
client_call_destroy(call);
end:
- pthread_mutex_unlock(&protows->mutex);
return rc;
}
@@ -830,15 +897,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
desc->protows = protows;
desc->next = protows->describes;
protows->describes = desc;
- pthread_mutex_unlock(&protows->mutex);
/* send */
if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
&& writebuf_uint32(&wb, desc->descid)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0)
+ && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
+ pthread_mutex_unlock(&protows->mutex);
return 0;
+ }
- pthread_mutex_lock(&protows->mutex);
d = protows->describes;
if (d == desc)
protows->describes = desc->next;
@@ -848,8 +915,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
if (d)
d->next = desc->next;
}
- free(desc);
pthread_mutex_unlock(&protows->mutex);
+ free(desc);
error:
/* TODO? callback(closure, NULL); */
return -1;
@@ -931,18 +998,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf
static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
{
+ int rc;
struct writebuf wb = { .count = 0 };
- return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
- && writebuf_uint32(&wb, descid)
- && writebuf_object(&wb, descobj)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+ && writebuf_uint32(&wb, descid)
+ && writebuf_object(&wb, descobj)) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
{
int rc = server_send_description(describe->protows, describe->descid, description);
- afb_proto_ws_addref(describe->protows);
+ afb_proto_ws_unref(describe->protows);
free(describe);
return rc;
}
@@ -971,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
}
/* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
{
- struct afb_proto_ws *protows;
- struct readbuf rb;
-
- rb.base = data;
- if (size > 0) {
- rb.head = data;
- rb.end = data + size;
- protows = closure;
+ struct binary *binary = closure;
- switch (*rb.head++) {
+ if (!sig) {
+ switch (*binary->rb.head++) {
case CHAR_FOR_CALL:
- server_on_call(protows, &rb);
+ server_on_call(binary->protows, &binary->rb);
break;
case CHAR_FOR_SUBCALL_REPLY:
- server_on_subcall_reply(protows, &rb);
+ server_on_subcall_reply(binary->protows, &binary->rb);
break;
case CHAR_FOR_DESCRIBE:
- server_on_describe(protows, &rb);
+ server_on_describe(binary->protows, &binary->rb);
break;
default: /* unexpected message */
/* TODO: close the connection */
break;
}
}
- free(rb.base);
+ free(binary->rb.base);
+ free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+ int rc;
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ errno = ENOMEM;
+ } else {
+ binary->protows = closure;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+ if (rc >= 0)
+ return;
+ free(binary);
+ }
+ }
+ free(data);
}
/******************* server part: manage events **********************************/
@@ -1005,12 +1097,19 @@ static void server_on_binary(void *closure, char *data, size_t size)
static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
{
struct writebuf wb = { .count = 0 };
+ int rc;
- return -!(writebuf_char(&wb, order)
- && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
- && writebuf_string(&wb, event_name)
- && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ if (writebuf_char(&wb, order)
+ && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+ && writebuf_string(&wb, event_name)
+ && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
@@ -1059,6 +1158,7 @@ static void on_hangup(void *closure)
}
if (protows->fd >= 0) {
+ close(protows->fd);
protows->fd = -1;
if (protows->on_hangup)
protows->on_hangup(protows->closure);
diff --git a/src/afb-session.c b/src/afb-session.c
index 61bce09a..6b6ad634 100644
--- a/src/afb-session.c
+++ b/src/afb-session.c
@@ -31,10 +31,15 @@
#include "afb-session.h"
#include "verbose.h"
-#define COOKEYCOUNT 8
-#define COOKEYMASK (COOKEYCOUNT - 1)
+#define SIZEUUID 37
+#define HEADCOUNT 16
+#define COOKEYCOUNT 8
+#define COOKEYMASK (COOKEYCOUNT - 1)
-#define NOW (time(NULL))
+#define _MAXEXP_ ((time_t)(~(time_t)0))
+#define _MAXEXP2_ ((time_t)((((unsigned long long)_MAXEXP_) >> 1)))
+#define MAX_EXPIRATION (_MAXEXP_ >= 0 ? _MAXEXP_ : _MAXEXP2_)
+#define NOW (time(NULL))
struct cookie
{
@@ -46,40 +51,30 @@ struct cookie
struct afb_session
{
+ struct afb_session *next; /* link to the next */
unsigned refcount;
int timeout;
- time_t expiration; // expiration time of the token
- time_t access;
+ time_t expiration; // expiration time of the token
pthread_mutex_t mutex;
- char uuid[37]; // long term authentication of remote client
- char token[37]; // short term authentication of remote client
struct cookie *cookies[COOKEYCOUNT];
+ char autoclose;
+ char idx;
+ char uuid[SIZEUUID]; // long term authentication of remote client
+ char token[SIZEUUID]; // short term authentication of remote client
};
// Session UUID are store in a simple array [for 10 sessions this should be enough]
static struct {
- pthread_mutex_t mutex; // declare a mutex to protect hash table
- struct afb_session **store; // sessions store
- int count; // current number of sessions
+ pthread_mutex_t mutex; // declare a mutex to protect hash table
+ struct afb_session *heads[HEADCOUNT]; // sessions
+ int count; // current number of sessions
int max;
int timeout;
- char initok[37];
+ char initok[SIZEUUID];
} sessions;
-/**
- * Get the index of the 'key' in the cookies array.
- * @param key the key to scan
- * @return the index of the list for key within cookies
- */
-static int cookeyidx(const void *key)
-{
- intptr_t x = (intptr_t)key;
- unsigned r = (unsigned)((x >> 5) ^ (x >> 15));
- return r & COOKEYMASK;
-}
-
/* generate a uuid */
-static void new_uuid(char uuid[37])
+static void new_uuid(char uuid[SIZEUUID])
{
uuid_t newuuid;
uuid_generate(newuuid);
@@ -97,40 +92,51 @@ static inline void unlock(struct afb_session *session)
}
// Free context [XXXX Should be protected again memory abort XXXX]
-static void free_data (struct afb_session *session)
+static void close_session(struct afb_session *session)
{
int idx;
- struct cookie *cookie, *next;
+ struct cookie *cookie;
- // free cookies
+ /* free cookies */
for (idx = 0 ; idx < COOKEYCOUNT ; idx++) {
- cookie = session->cookies[idx];
- session->cookies[idx] = NULL;
- while (cookie != NULL) {
- next = cookie->next;
+ while ((cookie = session->cookies[idx])) {
+ session->cookies[idx] = cookie->next;
if (cookie->freecb != NULL)
cookie->freecb(cookie->value);
free(cookie);
- cookie = next;
}
}
}
+/* tiny hash function inspired from pearson */
+static int pearson4(const char *text)
+{
+ static uint8_t T[16] = {
+ 4, 1, 6, 0, 9, 14, 11, 5,
+ 2, 3, 12, 15, 10, 7, 8, 13
+ };
+ uint8_t r, c;
+
+ for (r = 0; (c = (uint8_t)*text) ; text++) {
+ r = T[r ^ (15 & c)];
+ r = T[r ^ (c >> 4)];
+ }
+ return r; // % HEADCOUNT;
+}
+
// Create a new store in RAM, not that is too small it will be automatically extended
void afb_session_init (int max_session_count, int timeout, const char *initok)
{
- // let's create as store as hashtable does not have any
- sessions.store = calloc (1 + (unsigned)max_session_count, sizeof *sessions.store);
pthread_mutex_init(&sessions.mutex, NULL);
sessions.max = max_session_count;
sessions.timeout = timeout;
if (initok == NULL)
/* without token, a secret is made to forbid creation of sessions */
new_uuid(sessions.initok);
- else if (strlen(initok) < sizeof(sessions.store[0]->token))
+ else if (strlen(initok) < sizeof sessions.initok)
strcpy(sessions.initok, initok);
else {
- ERROR("initial token '%s' too long (max length 36)", initok);
+ ERROR("initial token '%s' too long (max length %d)", initok, ((int)(sizeof sessions.initok)) - 1);
exit(1);
}
}
@@ -140,196 +146,205 @@ const char *afb_session_initial_token()
return sessions.initok;
}
-static struct afb_session *search (const char* uuid)
+static struct afb_session *search (const char* uuid, int idx)
{
- int idx;
struct afb_session *session;
- assert (uuid != NULL);
-
- pthread_mutex_lock(&sessions.mutex);
-
- for (idx=0; idx < sessions.max; idx++) {
- session = sessions.store[idx];
- if (session && (0 == strcmp (uuid, session->uuid)))
- goto found;
- }
- session = NULL;
+ session = sessions.heads[idx];
+ while (session && strcmp(uuid, session->uuid))
+ session = session->next;
-found:
- pthread_mutex_unlock(&sessions.mutex);
return session;
}
-static int destroy (struct afb_session *session)
+static void destroy (struct afb_session *session)
{
- int idx;
- int status;
+ struct afb_session **prv;
assert (session != NULL);
+ close_session(session);
pthread_mutex_lock(&sessions.mutex);
-
- for (idx=0; idx < sessions.max; idx++) {
- if (sessions.store[idx] == session) {
- sessions.store[idx] = NULL;
+ prv = &sessions.heads[(int)session->idx];
+ while (*prv)
+ if (*prv != session)
+ prv = &((*prv)->next);
+ else {
+ *prv = session->next;
sessions.count--;
- status = 1;
- goto deleted;
+ pthread_mutex_destroy(&session->mutex);
+ free(session);
+ break;
}
- }
- status = 0;
-deleted:
pthread_mutex_unlock(&sessions.mutex);
- return status;
}
-static int add (struct afb_session *session)
+// Loop on every entry and remove old context sessions.hash
+static time_t cleanup ()
{
+ struct afb_session *session, *next;
int idx;
- int status;
-
- assert (session != NULL);
-
- pthread_mutex_lock(&sessions.mutex);
+ time_t now;
- for (idx=0; idx < sessions.max; idx++) {
- if (NULL == sessions.store[idx]) {
- sessions.store[idx] = session;
- sessions.count++;
- status = 1;
- goto added;
+ // Loop on Sessions Table and remove anything that is older than timeout
+ now = NOW;
+ for (idx = 0 ; idx < HEADCOUNT; idx++) {
+ session = sessions.heads[idx];
+ while (session) {
+ next = session->next;
+ if (session->expiration < now)
+ afb_session_close(session);
+ session = next;
}
}
- status = 0;
-added:
- pthread_mutex_unlock(&sessions.mutex);
- return status;
+ return now;
}
-// Check if context timeout or not
-static int is_expired (struct afb_session *ctx, time_t now)
+static void update_timeout(struct afb_session *session, time_t now, int timeout)
{
- assert (ctx != NULL);
- return ctx->expiration < now;
+ time_t expiration;
+
+ /* compute expiration */
+ if (timeout == AFB_SESSION_TIMEOUT_INFINITE)
+ expiration = MAX_EXPIRATION;
+ else {
+ if (timeout == AFB_SESSION_TIMEOUT_DEFAULT)
+ expiration = now + sessions.timeout;
+ else
+ expiration = now + timeout;
+ if (expiration < 0)
+ expiration = MAX_EXPIRATION;
+ }
+
+ /* record the values */
+ session->timeout = timeout;
+ session->expiration = expiration;
}
-// Check if context is active or not
-static int is_active (struct afb_session *ctx, time_t now)
+static void update_expiration(struct afb_session *session, time_t now)
{
- assert (ctx != NULL);
- return ctx->uuid[0] != 0 && ctx->expiration >= now;
+ update_timeout(session, now, session->timeout);
}
-// Loop on every entry and remove old context sessions.hash
-static void cleanup (time_t now)
+static struct afb_session *add_session (const char *uuid, int timeout, time_t now, int idx)
{
- struct afb_session *ctx;
- long idx;
+ struct afb_session *session;
- // Loop on Sessions Table and remove anything that is older than timeout
- for (idx=0; idx < sessions.max; idx++) {
- ctx = sessions.store[idx];
- if (ctx != NULL && is_expired(ctx, now)) {
- afb_session_close (ctx);
- }
+ /* check arguments */
+ if (!AFB_SESSION_TIMEOUT_IS_VALID(timeout)
+ || (uuid && strlen(uuid) >= sizeof session->uuid)) {
+ errno = EINVAL;
+ return NULL;
}
-}
-static struct afb_session *make_session (const char *uuid, int timeout, time_t now)
-{
- struct afb_session *session;
+ /* check session count */
+ if (sessions.count >= sessions.max) {
+ errno = EBUSY;
+ return NULL;
+ }
/* allocates a new one */
session = calloc(1, sizeof *session);
if (session == NULL) {
errno = ENOMEM;
- goto error;
- }
- pthread_mutex_init(&session->mutex, NULL);
-
- /* generate the uuid */
- if (uuid == NULL) {
- new_uuid(session->uuid);
- } else {
- if (strlen(uuid) >= sizeof session->uuid) {
- errno = EINVAL;
- goto error2;
- }
- strcpy(session->uuid, uuid);
+ return NULL;
}
- /* init the token */
+ /* initialize */
+ pthread_mutex_init(&session->mutex, NULL);
+ session->refcount = 1;
+ strcpy(session->uuid, uuid);
strcpy(session->token, sessions.initok);
- session->timeout = timeout;
- if (timeout != 0)
- session->expiration = now + timeout;
- else {
- session->expiration = (time_t)(~(time_t)0);
- if (session->expiration < 0)
- session->expiration = (time_t)(((unsigned long long)session->expiration) >> 1);
- }
- if (!add (session)) {
- errno = ENOMEM;
- goto error2;
- }
+ update_timeout(session, now, timeout);
+
+ /* link */
+ session->idx = (char)idx;
+ session->next = sessions.heads[idx];
+ sessions.heads[idx] = session;
+ sessions.count++;
- session->access = now;
- session->refcount = 1;
return session;
+}
+
+/* create a new session for the given timeout */
+static struct afb_session *new_session (int timeout, time_t now)
+{
+ int idx;
+ char uuid[SIZEUUID];
-error2:
- free(session);
-error:
- return NULL;
+ do {
+ new_uuid(uuid);
+ idx = pearson4(uuid);
+ } while(search(uuid, idx));
+ return add_session(uuid, timeout, now, idx);
}
-struct afb_session *afb_session_create (const char *uuid, int timeout)
+/* Creates a new session with 'timeout' */
+struct afb_session *afb_session_create (int timeout)
{
time_t now;
+ struct afb_session *session;
/* cleaning */
- now = NOW;
- cleanup (now);
+ pthread_mutex_lock(&sessions.mutex);
+ now = cleanup();
+ session = new_session(timeout, now);
+ pthread_mutex_unlock(&sessions.mutex);
- /* search for an existing one not too old */
- if (uuid != NULL && search(uuid) != NULL) {
- errno = EEXIST;
- return NULL;
- }
+ return session;
+}
+
+/* Searchs the session of 'uuid' */
+struct afb_session *afb_session_search (const char *uuid)
+{
+ struct afb_session *session;
+
+ /* cleaning */
+ pthread_mutex_lock(&sessions.mutex);
+ cleanup();
+ session = search(uuid, pearson4(uuid));
+ if (session)
+ __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED);
+ pthread_mutex_unlock(&sessions.mutex);
+ return session;
- return make_session(uuid, timeout, now);
}
-// This function will return exiting session or newly created session
-struct afb_session *afb_session_get (const char *uuid, int *created)
+/* This function will return exiting session or newly created session */
+struct afb_session *afb_session_get (const char *uuid, int timeout, int *created)
{
+ int idx;
struct afb_session *session;
time_t now;
/* cleaning */
- now = NOW;
- cleanup (now);
+ pthread_mutex_lock(&sessions.mutex);
+ now = cleanup();
/* search for an existing one not too old */
- if (uuid != NULL) {
- session = search(uuid);
- if (!created)
- return session;
- if (session != NULL) {
- *created = 0;
- session->access = now;
- session->refcount++;
+ if (!uuid)
+ session = new_session(timeout, now);
+ else {
+ idx = pearson4(uuid);
+ session = search(uuid, idx);
+ if (session) {
+ __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED);
+ pthread_mutex_unlock(&sessions.mutex);
+ if (created)
+ *created = 0;
return session;
}
+ session = add_session (uuid, timeout, now, idx);
}
+ pthread_mutex_unlock(&sessions.mutex);
if (created)
- *created = 1;
+ *created = !!session;
- return make_session(uuid, sessions.timeout, now);
+ return session;
}
+/* increase the use count on the session */
struct afb_session *afb_session_addref(struct afb_session *session)
{
if (session != NULL)
@@ -337,32 +352,57 @@ struct afb_session *afb_session_addref(struct afb_session *session)
return session;
}
+/* decrease the use count of the session */
void afb_session_unref(struct afb_session *session)
{
if (session != NULL) {
assert(session->refcount != 0);
if (!__atomic_sub_fetch(&session->refcount, 1, __ATOMIC_RELAXED)) {
- if (session->uuid[0] == 0) {
+ pthread_mutex_lock(&session->mutex);
+ if (session->autoclose || session->uuid[0] == 0)
destroy (session);
- pthread_mutex_destroy(&session->mutex);
- free(session);
- }
+ else
+ pthread_mutex_unlock(&session->mutex);
}
}
}
-// Free Client Session Context
+// close Client Session Context
void afb_session_close (struct afb_session *session)
{
assert(session != NULL);
+ pthread_mutex_lock(&session->mutex);
if (session->uuid[0] != 0) {
session->uuid[0] = 0;
- free_data (session);
- if (session->refcount == 0) {
+ if (session->refcount)
+ close_session(session);
+ else {
destroy (session);
- free(session);
+ return;
}
}
+ pthread_mutex_unlock(&session->mutex);
+}
+
+/* set the autoclose flag */
+void afb_session_set_autoclose(struct afb_session *session, int autoclose)
+{
+ assert(session != NULL);
+ session->autoclose = (char)!!autoclose;
+}
+
+// is the session active?
+int afb_session_is_active (struct afb_session *session)
+{
+ assert(session != NULL);
+ return !!session->uuid[0];
+}
+
+// is the session closed?
+int afb_session_is_closed (struct afb_session *session)
+{
+ assert(session != NULL);
+ return !session->uuid[0];
}
// Sample Generic Ping Debug API
@@ -371,8 +411,10 @@ int afb_session_check_token (struct afb_session *session, const char *token)
assert(session != NULL);
assert(token != NULL);
- // compare current token with previous one
- if (!is_active (session, NOW))
+ if (!session->uuid[0])
+ return 0;
+
+ if (session->expiration < NOW)
return 0;
if (session->token[0] && strcmp (token, session->token) != 0)
@@ -390,111 +432,135 @@ void afb_session_new_token (struct afb_session *session)
new_uuid(session->token);
// keep track of time for session timeout and further clean up
- if (session->timeout != 0)
- session->expiration = NOW + session->timeout;
+ update_expiration(session, NOW);
}
+/* Returns the uuid of 'session' */
const char *afb_session_uuid (struct afb_session *session)
{
assert(session != NULL);
return session->uuid;
}
+/* Returns the token of 'session' */
const char *afb_session_token (struct afb_session *session)
{
assert(session != NULL);
return session->token;
}
-static struct cookie *cookie_search(struct afb_session *session, const void *key, int *idx)
-{
- struct cookie *cookie;
-
- cookie = session->cookies[*idx = cookeyidx(key)];
- while(cookie != NULL && cookie->key != key)
- cookie = cookie->next;
- return cookie;
-}
-
-static struct cookie *cookie_add(struct afb_session *session, int idx, const void *key, void *value, void (*freecb)(void*))
+/**
+ * Get the index of the 'key' in the cookies array.
+ * @param key the key to scan
+ * @return the index of the list for key within cookies
+ */
+static int cookeyidx(const void *key)
{
- struct cookie *cookie;
-
- cookie = malloc(sizeof *cookie);
- if (!cookie)
- errno = ENOMEM;
- else {
- cookie->key = key;
- cookie->value = value;
- cookie->freecb = freecb;
- cookie->next = session->cookies[idx];
- session->cookies[idx] = cookie;
- }
- return cookie;
+ intptr_t x = (intptr_t)key;
+ unsigned r = (unsigned)((x >> 5) ^ (x >> 15));
+ return r & COOKEYMASK;
}
+/**
+ * Set, get, replace, remove a cookie of 'key' for the 'session'
+ *
+ * The behaviour of this function depends on its parameters:
+ *
+ * @param session the session
+ * @param key the key of the cookie
+ * @param makecb the creation function or NULL
+ * @param freecb the release function or NULL
+ * @param closure an argument for makecb or the value if makecb==NULL
+ * @param replace a boolean enforcing replecement of the previous value
+ *
+ * @return the value of the cookie
+ *
+ * The 'key' is a pointer and compared as pointers.
+ *
+ * For getting the current value of the cookie:
+ *
+ * afb_session_cookie(session, key, NULL, NULL, NULL, 0)
+ *
+ * For storing the value of the cookie
+ *
+ * afb_session_cookie(session, key, NULL, NULL, value, 1)
+ */
void *afb_session_cookie(struct afb_session *session, const void *key, void *(*makecb)(void *closure), void (*freecb)(void *item), void *closure, int replace)
{
int idx;
void *value;
- struct cookie *cookie;
+ struct cookie *cookie, **prv;
+ /* get key hashed index */
+ idx = cookeyidx(key);
+
+ /* lock session and search for the cookie of 'key' */
lock(session);
- cookie = cookie_search(session, key, &idx);
- if (cookie) {
- if (!replace)
- value = cookie->value;
- else {
+ prv = &session->cookies[idx];
+ for (;;) {
+ cookie = *prv;
+ if (!cookie) {
+ /* 'key' not found, create value using 'closure' and 'makecb' */
value = makecb ? makecb(closure) : closure;
- if (cookie->value != value && cookie->freecb)
- cookie->freecb(cookie->value);
- cookie->value = value;
- cookie->freecb = freecb;
- }
- } else {
- value = makecb ? makecb(closure) : closure;
- if (replace || makecb || freecb) {
- cookie = cookie_add(session, idx, key, value, freecb);
- if (!cookie) {
- if (makecb && freecb)
- freecb(value);
- value = NULL;
+ /* store the the only if it has some meaning */
+ if (replace || makecb || freecb) {
+ cookie = malloc(sizeof *cookie);
+ if (!cookie) {
+ errno = ENOMEM;
+ /* calling freecb if there is no makecb may have issue */
+ if (makecb && freecb)
+ freecb(value);
+ value = NULL;
+ } else {
+ cookie->key = key;
+ cookie->value = value;
+ cookie->freecb = freecb;
+ cookie->next = NULL;
+ *prv = cookie;
+ }
+ }
+ break;
+ } else if (cookie->key == key) {
+ /* cookie of key found */
+ if (!replace)
+ /* not replacing, get the value */
+ value = cookie->value;
+ else {
+ /* create value using 'closure' and 'makecb' */
+ value = makecb ? makecb(closure) : closure;
+
+ /* free previous value is needed */
+ if (cookie->value != value && cookie->freecb)
+ cookie->freecb(cookie->value);
+
+ /* store the value and its releaser */
+ cookie->value = value;
+ cookie->freecb = freecb;
+
+ /* but if both are NULL drop the cookie */
+ if (!value && !freecb) {
+ *prv = cookie->next;
+ free(cookie);
+ }
}
+ break;
+ } else {
+ prv = &(cookie->next);
}
}
+
+ /* unlock the session and return the value */
unlock(session);
return value;
}
void *afb_session_get_cookie(struct afb_session *session, const void *key)
{
- int idx;
- void *value;
- struct cookie *cookie;
-
- lock(session);
- cookie = cookie_search(session, key, &idx);
- value = cookie ? cookie->value : NULL;
- unlock(session);
- return value;
+ return afb_session_cookie(session, key, NULL, NULL, NULL, 0);
}
int afb_session_set_cookie(struct afb_session *session, const void *key, void *value, void (*freecb)(void*))
{
- int idx;
- struct cookie *cookie;
-
- lock(session);
- cookie = cookie_search(session, key, &idx);
- if (!cookie)
- cookie = cookie_add(session, idx, key, value, freecb);
- else {
- if (cookie->value != value && cookie->freecb)
- cookie->freecb(cookie->value);
- cookie->value = value;
- cookie->freecb = freecb;
- }
- unlock(session);
- return -!cookie;
+ return -(value != afb_session_cookie(session, key, NULL, freecb, value, 1));
}
diff --git a/src/afb-session.h b/src/afb-session.h
index b5dc3944..d79ec414 100644
--- a/src/afb-session.h
+++ b/src/afb-session.h
@@ -19,17 +19,25 @@
struct afb_session;
+#define AFB_SESSION_TIMEOUT_INFINITE -1
+#define AFB_SESSION_TIMEOUT_DEFAULT -2
+#define AFB_SESSION_TIMEOUT_IS_VALID(x) ((x) >= AFB_SESSION_TIMEOUT_DEFAULT)
+
extern void afb_session_init(int max_session_count, int timeout, const char *initok);
extern const char *afb_session_initial_token();
-extern struct afb_session *afb_session_create (const char *uuid, int timeout);
-extern struct afb_session *afb_session_get (const char *uuid, int *created);
+extern struct afb_session *afb_session_create (int timeout);
+extern struct afb_session *afb_session_search (const char *uuid);
+extern struct afb_session *afb_session_get (const char *uuid, int timeout, int *created);
extern const char *afb_session_uuid (struct afb_session *session);
extern struct afb_session *afb_session_addref(struct afb_session *session);
extern void afb_session_unref(struct afb_session *session);
+extern void afb_session_set_autoclose(struct afb_session *session, int autoclose);
extern void afb_session_close(struct afb_session *session);
+extern int afb_session_is_active (struct afb_session *session);
+extern int afb_session_is_closed (struct afb_session *session);
extern int afb_session_check_token(struct afb_session *session, const char *token);
extern void afb_session_new_token(struct afb_session *session);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 693a0d0c..740a8575 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -125,6 +125,15 @@ struct server_describe
struct afb_proto_ws_describe *describe;
};
+/*
+ * structure for recording sessions
+ */
+struct server_session
+{
+ struct server_session *next;
+ struct afb_session *session;
+};
+
/******************* stub description for client or servers ******************/
struct afb_stub_ws
@@ -147,6 +156,9 @@ struct afb_stub_ws
/* credentials (server side) */
struct afb_cred *cred;
+ /* sessions (server side) */
+ struct server_session *sessions;
+
/* apiset */
struct afb_apiset *apiset;
@@ -460,6 +472,46 @@ static void on_subcall(void *closure, struct afb_proto_ws_subcall *subcall, void
/*****************************************************/
+static void record_session(struct afb_stub_ws *stubws, struct afb_session *session)
+{
+ struct server_session *s, **prv;
+
+ /* search */
+ prv = &stubws->sessions;
+ while ((s = *prv)) {
+ if (s->session == session)
+ return;
+ if (afb_session_is_active(s->session))
+ prv = &s->next;
+ else {
+ *prv = s->next;
+ afb_session_addref(s->session);
+ free(s);
+ }
+ }
+
+ /* create */
+ s = malloc(sizeof *s);
+ if (s) {
+ s->session = afb_session_addref(session);
+ s->next = stubws->sessions;
+ stubws->sessions = s;
+ }
+}
+
+static void release_sessions(struct afb_stub_ws *stubws)
+{
+ struct server_session *s;
+
+ while((s = stubws->sessions)) {
+ stubws->sessions = s->next;
+ afb_session_unref(s->session);
+ free(s);
+ }
+}
+
+/*****************************************************/
+
static void on_call(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid)
{
struct afb_stub_ws *stubws = closure;
@@ -480,6 +532,9 @@ static void on_call(void *closure, struct afb_proto_ws_call *call, const char *v
if (afb_context_connect(&wreq->xreq.context, sessionid, NULL) < 0)
goto unconnected;
wreq->xreq.context.validated = 1;
+ record_session(stubws, wreq->xreq.context.session);
+ if (wreq->xreq.context.created)
+ afb_session_set_autoclose(wreq->xreq.context.session, 1);
/* makes the call */
wreq->xreq.cred = afb_cred_addref(stubws->cred);
@@ -598,6 +653,8 @@ static void on_hangup(void *closure)
if (stubws->on_hangup)
stubws->on_hangup(stubws);
+
+ release_sessions(stubws);
}
/*****************************************************/
@@ -651,6 +708,7 @@ void afb_stub_ws_unref(struct afb_stub_ws *stubws)
if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
drop_all_events(stubws);
afb_evt_listener_unref(stubws->listener);
+ release_sessions(stubws);
afb_proto_ws_unref(stubws->proto);
afb_cred_unref(stubws->cred);
afb_apiset_unref(stubws->apiset);
diff --git a/src/afb-trace.c b/src/afb-trace.c
index fb077956..021ab5af 100644
--- a/src/afb-trace.c
+++ b/src/afb-trace.c
@@ -76,10 +76,9 @@ struct event {
};
/* struct for sessions */
-struct session {
- struct session *next; /* link to the next session */
- struct afb_session *session; /* the session */
- struct afb_trace *trace; /* the tracer */
+struct cookie {
+ struct afb_session *session; /* the session */
+ struct afb_trace *trace; /* the tracer */
};
/* struct for recording hooks */
@@ -88,7 +87,7 @@ struct hook {
void *handler; /* the handler of the hook */
struct event *event; /* the associated event */
struct tag *tag; /* the associated tag */
- struct session *session; /* the associated session */
+ struct afb_session *session; /* the associated session */
};
/* types of hooks */
@@ -111,7 +110,6 @@ struct afb_trace
struct afb_session *bound; /* bound to session */
struct event *events; /* list of events */
struct tag *tags; /* list of tags */
- struct session *sessions; /* list of tags */
struct hook *hooks[Trace_Type_Count]; /* hooks */
};
@@ -975,7 +973,7 @@ abstracting[Trace_Type_Count] =
/*******************************************************************************/
/* drop hooks of 'trace' matching 'tag' and 'event' and 'session' */
-static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct session *session)
+static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct afb_session *session)
{
int i;
struct hook *hook, **prev;
@@ -1004,24 +1002,7 @@ static void trace_cleanup(struct afb_trace *trace)
struct hook *hook;
struct tag *tag, **ptag;
struct event *event, **pevent;
- struct session *session, **psession;
- /* clean sessions */
- psession = &trace->sessions;
- while ((session = *psession)) {
- /* search for session */
- for (hook = NULL, i = 0 ; !hook && i < Trace_Type_Count ; i++)
- for (hook = trace->hooks[i] ; hook && hook->session != session ; hook = hook->next);
- /* keep or free whether used or not */
- if (hook)
- psession = &session->next;
- else {
- *psession = session->next;
- if (__atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED))
- afb_session_set_cookie(session->session, session, NULL, NULL);
- free(session);
- }
- }
/* clean tags */
ptag = &trace->tags;
while ((tag = *ptag)) {
@@ -1053,19 +1034,6 @@ static void trace_cleanup(struct afb_trace *trace)
}
}
-/* callback at end of traced session */
-static void free_session_cookie(void *cookie)
-{
- struct session *session = cookie;
- struct afb_trace *trace = __atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED);
- if (trace) {
- pthread_mutex_lock(&trace->mutex);
- trace_unhook(trace, NULL, NULL, session);
- trace_cleanup(trace);
- pthread_mutex_unlock(&trace->mutex);
- }
-}
-
/*
* Get the tag of 'name' within 'trace'.
* If 'alloc' isn't zero, create the tag and add it.
@@ -1123,41 +1091,48 @@ static struct event *trace_get_event(struct afb_trace *trace, const char *name,
}
/*
- * Get the session of 'value' within 'trace'.
- * If 'alloc' isn't zero, create the session and add it.
+ * called on session closing
*/
-static struct session *trace_get_session(struct afb_trace *trace, struct afb_session *value, int alloc)
-{
- struct session *session;
-
- /* search the session */
- session = trace->sessions;
- while (session && session->session != value)
- session = session->next;
-
- if (!session && alloc) {
- session = malloc(sizeof * session);
- if (session) {
- session->session = value;
- session->trace = NULL;
- session->next = trace->sessions;
- trace->sessions = session;
- }
- }
- return session;
+static void session_closed(void *item)
+{
+ struct cookie *cookie = item;
+
+ pthread_mutex_lock(&cookie->trace->mutex);
+ trace_unhook(cookie->trace, NULL, NULL, cookie->session);
+ pthread_mutex_unlock(&cookie->trace->mutex);
+ free(cookie);
+}
+
+/*
+ * records the cookie of session for tracking close
+ */
+static void *session_open(void *closure)
+{
+ struct cookie *param = closure, *cookie;
+ cookie = malloc(sizeof *cookie);
+ if (cookie)
+ *cookie = *param;
+ return cookie;
}
/*
* Get the session of 'uuid' within 'trace'.
* If 'alloc' isn't zero, create the session and add it.
*/
-static struct session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc)
+static struct afb_session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc)
{
- struct afb_session *session;
- int created;
+ struct cookie cookie;
- session = afb_session_get(uuid, alloc ? &created : NULL);
- return session ? trace_get_session(trace, session, alloc) : NULL;
+ if (!alloc)
+ cookie.session = afb_session_search(uuid);
+ else {
+ cookie.session = afb_session_get(uuid, AFB_SESSION_TIMEOUT_DEFAULT, NULL);
+ if (cookie.session) {
+ cookie.trace = trace;
+ afb_session_cookie(cookie.session, cookie.trace, session_open, session_closed, &cookie, 0);
+ }
+ }
+ return cookie.session;
}
static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char *event, const char *tag)
@@ -1178,13 +1153,8 @@ static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char
static void trace_attach_hook(struct afb_trace *trace, struct hook *hook, enum trace_type type)
{
- struct session *session = hook->session;
hook->next = trace->hooks[type];
trace->hooks[type] = hook;
- if (session && !session->trace) {
- session->trace = trace;
- afb_session_set_cookie(session->session, session, session, free_session_cookie);
- }
}
/*******************************************************************************/
@@ -1214,7 +1184,7 @@ struct desc
static void addhook(struct desc *desc, enum trace_type type)
{
struct hook *hook;
- struct session *session;
+ struct afb_session *session;
struct afb_session *bind;
struct afb_trace *trace = desc->context->trace;
@@ -1241,17 +1211,19 @@ static void addhook(struct desc *desc, enum trace_type type)
/* create the hook handler */
switch (type) {
case Trace_Type_Xreq:
- if (desc->session) {
+ if (!desc->session)
+ session = afb_session_addref(bind);
+ else {
session = trace_get_session_by_uuid(trace, desc->session, 1);
if (!session) {
ctxt_error(&desc->context->errors, "allocation of session failed");
free(hook);
return;
}
- bind = session->session;
}
- hook->handler = afb_hook_create_xreq(desc->api, desc->verb, bind,
+ hook->handler = afb_hook_create_xreq(desc->api, desc->verb, session,
desc->flags[type], &hook_xreq_itf, hook);
+ afb_session_unref(session);
break;
case Trace_Type_Ditf:
hook->handler = afb_hook_create_ditf(desc->api, desc->flags[type], &hook_ditf_itf, hook);
@@ -1443,7 +1415,7 @@ static void drop_session(void *closure, struct json_object *object)
{
int rc;
struct context *context = closure;
- struct session *session;
+ struct afb_session *session;
const char *uuid;
rc = wrap_json_unpack(object, "s", &uuid);
@@ -1453,8 +1425,10 @@ static void drop_session(void *closure, struct json_object *object)
session = trace_get_session_by_uuid(context->trace, uuid, 0);
if (!session)
ctxt_error(&context->errors, "session %s not found", uuid);
- else
+ else {
trace_unhook(context->trace, NULL, NULL, session);
+ afb_session_unref(session);
+ }
}
}
diff --git a/src/jobs-fake.c b/src/jobs-fake.c
new file mode 100644
index 00000000..3c1c2732
--- /dev/null
+++ b/src/jobs-fake.c
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <systemd/sd-event.h>
+
+#include "jobs.h"
+#include "sig-monitor.h"
+#include "verbose.h"
+
+#include "jobs.h"
+
+struct jobloop;
+
+struct job
+{
+ struct job *next;
+ const void *group;
+ int timeout;
+ void (*callback)(int signum, void* arg);
+ void *closure;
+};
+
+static struct job *first, *last;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure)
+{
+ struct job *j;
+
+ j = malloc(sizeof*j);
+ if (!j) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ j->next = 0;
+ j->group = group;
+ j->timeout = timeout;
+ j->callback = callback;
+ j->closure = closure;
+
+ pthread_mutex_lock(&mutex);
+ if (first)
+ last->next = j;
+ else
+ first = j;
+ last = j;
+ pthread_mutex_unlock(&mutex);
+ return 0;
+}
+
+static void *thrrun(void *arg)
+{
+ struct job *j;
+
+ pthread_mutex_lock(&mutex);
+ j = first;
+ if (j)
+ first = j->next;
+ pthread_mutex_unlock(&mutex);
+ if (j) {
+ j->callback(0, j->closure);
+ free(j);
+ }
+ return 0;
+}
+
+int jobs_queue(
+ const void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg),
+ void *arg)
+{
+ pthread_t tid;
+ int rc = add_job(group, timeout, callback, arg);
+ if (!rc) {
+ rc = pthread_create(&tid, NULL, thrrun, NULL);
+ if (rc)
+ rc = -1;
+ }
+ return rc;
+}
+
+#if 0
+int jobs_enter(
+ const void *group,
+ int timeout,
+ void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+ void *closure)
+{
+ return 0;
+}
+
+int jobs_leave(struct jobloop *jobloop)
+{
+ return 0;
+}
+
+int jobs_call(
+ const void *group,
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return 0;
+}
+
+struct sd_event *jobs_get_sd_event()
+{
+ struct sd_event *r;
+ int rc = sd_event_default(&r);
+ return rc < 0 ? NULL : r;
+}
+
+void jobs_terminate()
+{
+}
+
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
+{
+ start(0);
+ return 0;
+}
+#endif
diff --git a/src/jobs.c b/src/jobs.c
index 93e864f2..508d1b8e 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -64,8 +64,11 @@ struct events
struct events *next;
struct sd_event *event;
uint64_t timeout;
- unsigned used: 1;
- unsigned runs: 1;
+ enum {
+ Available,
+ Modifiable,
+ Locked
+ } state;
};
/** Description of threads */
@@ -74,10 +77,8 @@ struct thread
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
struct job *job; /**< currently processed job */
- struct events *events; /**< currently processed job */
pthread_t tid; /**< the thread id */
unsigned stop: 1; /**< stop requested */
- unsigned lowered: 1; /**< has a lower same thread */
unsigned waits: 1; /**< is waiting? */
};
@@ -109,7 +110,8 @@ static int nevents = 0; /** count of events */
/* list of threads */
static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
/* queue of pending jobs */
static struct job *first_job;
@@ -204,7 +206,7 @@ static inline struct job *job_get()
static inline struct events *events_get()
{
struct events *events = first_events;
- while (events && events->used)
+ while (events && events->state != Available)
events = events->next;
return events;
}
@@ -272,9 +274,34 @@ static void job_cancel(int signum, void *arg)
*/
static void events_call(int signum, void *arg)
{
+ int rc;
+ struct sd_event *se;
struct events *events = arg;
- if (!signum)
- sd_event_run(events->event, events->timeout);
+
+ if (!signum) {
+ se = events->event;
+ rc = sd_event_prepare(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+ } else {
+ if (rc == 0) {
+ rc = sd_event_wait(se, events->timeout);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+
+ if (rc > 0) {
+ rc = sd_event_dispatch(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+ }
+ }
}
/**
@@ -286,7 +313,7 @@ static void events_call(int signum, void *arg)
*/
static void thread_run(volatile struct thread *me)
{
- struct thread **prv, *thr;
+ struct thread **prv;
struct job *job;
struct events *events;
uint64_t evto;
@@ -294,22 +321,18 @@ static void thread_run(volatile struct thread *me)
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
me->stop = 0;
- me->lowered = 0;
me->waits = 0;
- me->upper = current;
- if (current) {
- current->lowered = 1;
+ me->upper = current_thread;
+ if (current_thread) {
evto = EVENT_TIMEOUT_CHILD;
- me->events = current->events;
} else {
started++;
sig_monitor_init_timeouts();
evto = EVENT_TIMEOUT_TOP;
- me->events = NULL;
}
me->next = threads;
threads = (struct thread*)me;
- current = (struct thread*)me;
+ current_thread = (struct thread*)me;
/* loop until stopped */
while (!me->stop) {
@@ -326,37 +349,34 @@ static void thread_run(volatile struct thread *me)
sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
- /* release the run job */
- job_release(job);
-
/* release event if any */
- events = me->events;
- if (events) {
- events->used = 0;
- me->events = NULL;
+ events = current_events;
+ if (events && events->state == Modifiable) {
+ current_events = NULL;
+ events->state = Available;
}
+
+ /* release the run job */
+ job_release(job);
} else {
/* no job, check events */
- events = me->events;
- if (!events || events->runs)
+ events = current_events;
+ if (!events)
events = events_get();
+ else if (events->state == Locked) {
+ events = 0;
+ WARNING("Loosing an event loop because reentering");
+ }
if (events) {
/* run the events */
- events->used = 1;
- events->runs = 1;
+ events->state = Locked;
events->timeout = evto;
- me->events = events;
+ current_events = events;
pthread_mutex_unlock(&mutex);
sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
- events->used = 0;
- events->runs = 0;
- me->events = NULL;
- thr = me->upper;
- while (thr && thr->events == events) {
- thr->events = NULL;
- thr = thr->upper;
- }
+ current_events = NULL;
+ events->state = Available;
} else {
/* no job and not events */
waiting++;
@@ -373,10 +393,8 @@ static void thread_run(volatile struct thread *me)
while (*prv != me)
prv = &(*prv)->next;
*prv = me->next;
- current = me->upper;
- if (current) {
- current->lowered = 0;
- } else {
+ current_thread = me->upper;
+ if (!current_thread) {
sig_monitor_clean_timeouts();
started--;
}
@@ -631,19 +649,13 @@ int jobs_call(
struct sd_event *jobs_get_sd_event()
{
struct events *events;
- struct thread *me;
int rc;
pthread_mutex_lock(&mutex);
/* search events on stack */
- me = current;
- while (me && !me->events)
- me = me->upper;
- if (me)
- /* return the stacked events */
- events = me->events;
- else {
+ events = current_events;
+ if (!events) {
/* search an available events */
events = events_get();
if (!events) {
@@ -655,8 +667,7 @@ struct sd_event *jobs_get_sd_event()
events = malloc(sizeof *events);
if (events && (rc = sd_event_new(&events->event)) >= 0) {
if (nevents < started || start_one_thread() >= 0) {
- events->used = 0;
- events->runs = 0;
+ events->state = Available;
events->next = first_events;
first_events = events;
} else {
@@ -679,13 +690,10 @@ struct sd_event *jobs_get_sd_event()
}
}
if (events) {
- me = current;
- if (me) {
- events->used = 1;
- me->events = events;
- } else {
+ events->state = Modifiable;
+ if (!current_thread)
WARNING("event returned for unknown thread!");
- }
+ current_events = events;
}
}
pthread_mutex_unlock(&mutex);
@@ -715,7 +723,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
pthread_mutex_lock(&mutex);
/* check whether already running */
- if (current || allowed) {
+ if (current_thread || allowed) {
ERROR("thread already started");
errno = EINVAL;
goto error;
@@ -822,7 +830,7 @@ void jobs_terminate()
head = job->next;
/* search if job is stacked for current */
- t = current;
+ t = current_thread;
while (t && t->job != job)
t = t->upper;
if (t) {
diff --git a/src/main.c b/src/main.c
index b5b00234..150b7810 100644
--- a/src/main.c
+++ b/src/main.c
@@ -525,7 +525,7 @@ static void run_startup_calls()
list = config->calls;
if (list) {
sreq = calloc(1, sizeof *sreq);
- sreq->session = afb_session_create("startup", 3600);
+ sreq->session = afb_session_create(3600);
sreq->current = list;
startup_call_current(sreq);
}
diff --git a/src/verbose.c b/src/verbose.c
index f330c6f5..e96627ed 100644
--- a/src/verbose.c
+++ b/src/verbose.c
@@ -85,6 +85,7 @@ void verbose_set_name(const char *name, int authority)
#include <errno.h>
#include <string.h>
#include <sys/uio.h>
+#include <pthread.h>
static const char *appname;
@@ -105,6 +106,8 @@ static int tty;
static const char chars[] = { '\n', '?', ':', ' ', '[', ',', ']' };
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
static void _vverbose_(int loglevel, const char *file, int line, const char *function, const char *fmt, va_list args)
{
char buffer[4000];
@@ -169,7 +172,9 @@ static void _vverbose_(int loglevel, const char *file, int line, const char *fun
iov[n].iov_base = (void*)&chars[0];
iov[n++].iov_len = 1;
+ pthread_mutex_lock(&mutex);
writev(STDERR_FILENO, iov, n);
+ pthread_mutex_unlock(&mutex);
errno = saverr;
}
diff --git a/test/AFB.js b/test/AFB.js
index 59e68abc..ed2ffc2d 100644
--- a/test/AFB.js
+++ b/test/AFB.js
@@ -21,7 +21,7 @@ if (typeof base != "object")
var initial = {
base: base.base || "api",
- token: base.token || initialtoken || "hello",
+ token: base.token || initialtoken || "HELLO",
host: base.host || window.location.host,
url: base.url || undefined
};
@@ -105,8 +105,7 @@ var AFB_websocket;
function onclose(event) {
for (var id in this.pendings) {
- var ferr = this.pendings[id].onerror;
- ferr && ferr(null, this);
+ try { this.pendings[id][1](); } catch (x) {/*TODO?*/}
}
this.pendings = {};
this.onclose && this.onclose();
@@ -131,8 +130,7 @@ var AFB_websocket;
if (id in pendings) {
var p = pendings[id];
delete pendings[id];
- var f = p[offset];
- f(ans);
+ try { p[offset](ans); } catch (x) {/*TODO?*/}
}
}
@@ -166,12 +164,18 @@ var AFB_websocket;
this.onabort = function(){};
}
- function call(method, request) {
+ function call(method, request, callid) {
return new Promise((function(resolve, reject){
var id, arr;
- do {
- id = String(this.counter = 4095 & (this.counter + 1));
- } while (id in this.pendings);
+ if (callid) {
+ id = String(callid);
+ if (id in this.pendings)
+ throw new Error("pending callid("+id+") exists");
+ } else {
+ do {
+ id = String(this.counter = 4095 & (this.counter + 1));
+ } while (id in this.pendings);
+ }
this.pendings[id] = [ resolve, reject ];
arr = [CALL, id, method, request ];
if (AFB_context.token) arr.push(AFB_context.token);
diff --git a/test/monitoring/AFB.js b/test/monitoring/AFB.js
index 59e68abc..ed2ffc2d 100644
--- a/test/monitoring/AFB.js
+++ b/test/monitoring/AFB.js
@@ -21,7 +21,7 @@ if (typeof base != "object")
var initial = {
base: base.base || "api",
- token: base.token || initialtoken || "hello",
+ token: base.token || initialtoken || "HELLO",
host: base.host || window.location.host,
url: base.url || undefined
};
@@ -105,8 +105,7 @@ var AFB_websocket;
function onclose(event) {
for (var id in this.pendings) {
- var ferr = this.pendings[id].onerror;
- ferr && ferr(null, this);
+ try { this.pendings[id][1](); } catch (x) {/*TODO?*/}
}
this.pendings = {};
this.onclose && this.onclose();
@@ -131,8 +130,7 @@ var AFB_websocket;
if (id in pendings) {
var p = pendings[id];
delete pendings[id];
- var f = p[offset];
- f(ans);
+ try { p[offset](ans); } catch (x) {/*TODO?*/}
}
}
@@ -166,12 +164,18 @@ var AFB_websocket;
this.onabort = function(){};
}
- function call(method, request) {
+ function call(method, request, callid) {
return new Promise((function(resolve, reject){
var id, arr;
- do {
- id = String(this.counter = 4095 & (this.counter + 1));
- } while (id in this.pendings);
+ if (callid) {
+ id = String(callid);
+ if (id in this.pendings)
+ throw new Error("pending callid("+id+") exists");
+ } else {
+ do {
+ id = String(this.counter = 4095 & (this.counter + 1));
+ } while (id in this.pendings);
+ }
this.pendings[id] = [ resolve, reject ];
arr = [CALL, id, method, request ];
if (AFB_context.token) arr.push(AFB_context.token);
diff --git a/test/monitoring/monitor-pastel.css b/test/monitoring/monitor-pastel.css
index d5cf3f6f..839f574f 100644
--- a/test/monitoring/monitor-pastel.css
+++ b/test/monitoring/monitor-pastel.css
@@ -266,7 +266,7 @@ body {
/*******************************************************************/
/* json format */
-.json.string { color: lightskyblue; }
+.json.string { color: teal; }
.json.number { color: darkorange; }
.json.boolean { color: deepskyblue; }
.json.null { color: magenta; }
diff --git a/test/monitoring/monitor.html b/test/monitoring/monitor.html
index 2c07c1ba..5a418879 100644
--- a/test/monitoring/monitor.html
+++ b/test/monitoring/monitor.html
@@ -57,7 +57,7 @@
<div id="params" class="clearfix">
<div>host: <input type="text" id="param-host" size="50" value="localhost"></input></div>
<div>port: <input type="text" id="param-port" size="10" value="1234"></input></div>
- <div>token: <input type="text" id="param-token" size="33" value="hello"></input></div>
+ <div>token: <input type="text" id="param-token" size="33" value="HELLO"></input></div>
</div>
<div class="-flex-fill -box-out">
<div id="trace-events" class="-box-in">
diff --git a/test/monitoring/monitor.js b/test/monitoring/monitor.js
index 5b9dc0ea..3c64ab33 100644
--- a/test/monitoring/monitor.js
+++ b/test/monitoring/monitor.js
@@ -131,7 +131,7 @@ function init() {
at("param-host").value = document.location.hostname;
at("param-port").value = document.location.port;
var args = new URLSearchParams(document.location.search.substring(1));
- at("param-token").value = args.get("x-afb-token") || args.get("token") || "hello";
+ at("param-token").value = args.get("x-afb-token") || args.get("token") || "HELLO";
document.onbeforeunload = on_disconnect;