diff options
-rw-r--r-- | CMakeLists.txt | 45 | ||||
-rw-r--r-- | bindings/CMakeLists.txt | 3 | ||||
-rw-r--r-- | bindings/samples/ave.c | 2 | ||||
-rw-r--r-- | include/afb/afb-daemon-itf.h | 2 | ||||
-rw-r--r-- | include/afb/afb-daemon-v1.h | 3 | ||||
-rw-r--r-- | include/afb/afb-daemon-v2.h | 3 | ||||
-rw-r--r-- | include/afb/afb-dynapi-itf.h | 1 | ||||
-rw-r--r-- | include/afb/afb-dynapi.h | 3 | ||||
-rw-r--r-- | src/CMakeLists.txt | 12 | ||||
-rw-r--r-- | src/afb-api-dyn.c | 4 | ||||
-rw-r--r-- | src/afb-api-dyn.h | 1 | ||||
-rw-r--r-- | src/afb-context.c | 2 | ||||
-rw-r--r-- | src/afb-evt.h | 1 | ||||
-rw-r--r-- | src/afb-export.c | 12 | ||||
-rw-r--r-- | src/afb-proto-ws.c | 254 | ||||
-rw-r--r-- | src/afb-session.c | 536 | ||||
-rw-r--r-- | src/afb-session.h | 12 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 58 | ||||
-rw-r--r-- | src/afb-trace.c | 122 | ||||
-rw-r--r-- | src/jobs-fake.c | 150 | ||||
-rw-r--r-- | src/jobs.c | 124 | ||||
-rw-r--r-- | src/main.c | 2 | ||||
-rw-r--r-- | src/verbose.c | 5 | ||||
-rw-r--r-- | test/AFB.js | 22 | ||||
-rw-r--r-- | test/monitoring/AFB.js | 22 | ||||
-rw-r--r-- | test/monitoring/monitor-pastel.css | 2 | ||||
-rw-r--r-- | test/monitoring/monitor.html | 2 | ||||
-rw-r--r-- | test/monitoring/monitor.js | 2 |
28 files changed, 912 insertions, 495 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 163a74b6..568eb42b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,19 +70,14 @@ CHECK_LIBRARY_EXISTS(magic magic_load "" HAVE_LIBMAGIC_SO) IF(HAVE_MAGIC_H) IF(HAVE_LIBMAGIC_SO) SET(HAVE_LIBMAGIC "1") + SET(LIBMAGIC_LDFLAGS -lmagic) ENDIF(HAVE_LIBMAGIC_SO) ENDIF(HAVE_MAGIC_H) -IF(NOT HAVE_LIBMAGIC) - MESSAGE(FATAL_ERROR "\"magic.h\" or \"libmagic.so\" missing. - Please install the \"file-devel\" or \"libmagic-dev\" package !") -ENDIF(NOT HAVE_LIBMAGIC) -ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE) - -PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd>=222) -PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.55) -PKG_CHECK_MODULES(openssl REQUIRED openssl) -PKG_CHECK_MODULES(uuid REQUIRED uuid) +PKG_CHECK_MODULES(libsystemd libsystemd>=222) +PKG_CHECK_MODULES(libmicrohttpd libmicrohttpd>=0.9.55) +PKG_CHECK_MODULES(openssl openssl) +PKG_CHECK_MODULES(uuid uuid) PKG_CHECK_MODULES(cynara cynara-client) IF(AGL_DEVEL) @@ -93,6 +88,32 @@ IF(cynara_FOUND) ADD_DEFINITIONS(-DBACKEND_PERMISSION_IS_CYNARA) ENDIF(cynara_FOUND) +IF(HAVE_LIBMAGIC AND libsystemd_FOUND AND libmicrohttpd_FOUND AND openssl_FOUND AND uuid_FOUND) + SET(WITH_BINDER TRUE) + ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE) +ELSE() + IF(NOT HAVE_LIBMAGIC) + MESSAGE(WARNING "\"magic.h\" or \"libmagic.so\" missing. + Please install the \"file-devel\" or \"libmagic-dev\" package !") + ENDIF(NOT HAVE_LIBMAGIC) + IF(NOT libsystemd_FOUND) + MESSAGE(WARNING "Dependency to 'libsystemd' is missing") + ENDIF() + IF(NOT libmicrohttpd_FOUND) + MESSAGE(WARNING "Dependency to 'libmicrohttpd' is missing") + ENDIF() + IF(NOT openssl_FOUND) + MESSAGE(WARNING "Dependency to 'openssl' is missing") + ENDIF() + IF(NOT uuid_FOUND) + MESSAGE(WARNING "Dependency to 'uuid' is missing") + ENDIF() + IF(NOT ALLOW_NO_BINDER) + MESSAGE(FATAL_ERROR "Can't compile the binder, either define ALLOW_NO_BINDER or install dependencies") + ENDIF() + SET(WITH_BINDER FALSE) +ENDIF() + ADD_DEFINITIONS(-DAFB_VERSION="${PROJECT_VERSION}") INCLUDE_DIRECTORIES( @@ -114,7 +135,7 @@ SET(link_libraries ${uuid_LDFLAGS} ${openssl_LDFLAGS} ${cynara_LDFLAGS} - -lmagic + ${LIBMAGIC_LDFLAGS} -ldl -lrt ) @@ -123,7 +144,7 @@ SET(binding_install_dir ${CMAKE_INSTALL_FULL_LIBDIR}/afb) ########################################################################### # activates the monitoring by default -if(INCLUDE_MONITORING) +if(INCLUDE_MONITORING AND WITH_BINDER) add_definitions(-DWITH_MONITORING_OPTION) INSTALL(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/test/monitoring diff --git a/bindings/CMakeLists.txt b/bindings/CMakeLists.txt index 1a1e9016..509c6cbf 100644 --- a/bindings/CMakeLists.txt +++ b/bindings/CMakeLists.txt @@ -16,6 +16,9 @@ # limitations under the License. ########################################################################### +IF(WITH_BINDER) ADD_SUBDIRECTORY(intrinsics) ADD_SUBDIRECTORY(samples) ADD_SUBDIRECTORY(tutorial) +ENDIF(WITH_BINDER) + diff --git a/bindings/samples/ave.c b/bindings/samples/ave.c index a4b4144d..569245eb 100644 --- a/bindings/samples/ave.c +++ b/bindings/samples/ave.c @@ -483,7 +483,7 @@ int afbBindingVdyn(afb_dynapi *dynapi) int i, rc; for (i = 0; apis[i] ; i++) { - rc = afb_dynapi_new_api(dynapi, apis[i], NULL, build_api, (void*)apis[i]); + rc = afb_dynapi_new_api(dynapi, apis[i], NULL, 0, build_api, (void*)apis[i]); if (rc < 0) AFB_DYNAPI_ERROR(dynapi, "can't create API %s", apis[i]); } diff --git a/include/afb/afb-daemon-itf.h b/include/afb/afb-daemon-itf.h index b78f9af9..492032ee 100644 --- a/include/afb/afb-daemon-itf.h +++ b/include/afb/afb-daemon-itf.h @@ -44,7 +44,7 @@ struct afb_daemon_itf struct afb_req (*unstore_req)(void*closure, struct afb_stored_req *sreq); int (*require_api)(void*closure, const char *name, int initialized); int (*rename_api)(void*closure, const char *name); - int (*new_api)(void *closure, const char *api, const char *info, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure); + int (*new_api)(void *closure, const char *api, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure); }; /* diff --git a/include/afb/afb-daemon-v1.h b/include/afb/afb-daemon-v1.h index d1a0cc2a..d199a486 100644 --- a/include/afb/afb-daemon-v1.h +++ b/include/afb/afb-daemon-v1.h @@ -195,8 +195,9 @@ static inline int afb_daemon_new_api_v1( struct afb_daemon daemon, const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *closure) { - return daemon.itf->new_api(daemon.closure, api, info, preinit, closure); + return daemon.itf->new_api(daemon.closure, api, info, noconcurrency, preinit, closure); } diff --git a/include/afb/afb-daemon-v2.h b/include/afb/afb-daemon-v2.h index 1ea40e96..6eb48c60 100644 --- a/include/afb/afb-daemon-v2.h +++ b/include/afb/afb-daemon-v2.h @@ -171,9 +171,10 @@ static inline int afb_daemon_rename_api_v2(const char *name) static inline int afb_daemon_new_api_v2( const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *closure) { - return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, preinit, closure); + return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, noconcurrency, preinit, closure); } diff --git a/include/afb/afb-dynapi-itf.h b/include/afb/afb-dynapi-itf.h index 682558e0..fc90dbde 100644 --- a/include/afb/afb-dynapi-itf.h +++ b/include/afb/afb-dynapi-itf.h @@ -131,6 +131,7 @@ struct afb_dynapi_itf void *dynapi, const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *closure); diff --git a/include/afb/afb-dynapi.h b/include/afb/afb-dynapi.h index dfdcdb24..e2458952 100644 --- a/include/afb/afb-dynapi.h +++ b/include/afb/afb-dynapi.h @@ -242,10 +242,11 @@ static inline int afb_dynapi_new_api( struct afb_dynapi *dynapi, const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *closure) { - return dynapi->itf->api_new_api(dynapi, api, info, preinit, closure); + return dynapi->itf->api_new_api(dynapi, api, info, noconcurrency, preinit, closure); } static inline int afb_dynapi_set_verbs_v2( diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a7d1ff95..b8accc77 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -25,6 +25,10 @@ endif(ALLOW_NO_BINDER) INCLUDE(FindPkgConfig) ADD_SUBDIRECTORY(genskel) + +IF(WITH_BINDER) +########################################### + ADD_SUBDIRECTORY(tests) ADD_DEFINITIONS(-DBINDING_INSTALL_DIR="${binding_install_dir}") @@ -91,13 +95,14 @@ INSTALL(TARGETS afb-daemon ########################################### # build and install libafbwsc ########################################### -ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c) +ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c) SET_TARGET_PROPERTIES(afbwsc PROPERTIES VERSION ${LIBAFBWSC_VERSION} SOVERSION ${LIBAFBWSC_SOVERSION}) TARGET_LINK_LIBRARIES(afbwsc ${libsystemd_LDFLAGS} ${json-c_LDFLAGS} + -lpthread -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/export-afbwsc.map -Wl,--as-needed -Wl,--gc-sections @@ -117,4 +122,7 @@ TARGET_LINK_LIBRARIES(afb-client-demo INSTALL(TARGETS afb-client-demo RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) - +########################################### +ELSE(WITH_BINDER) + MESSAGE(WARNING "NOT compiling the binder! but tools are compiled") +ENDIF(WITH_BINDER) diff --git a/src/afb-api-dyn.c b/src/afb-api-dyn.c index 0667f519..2ef1b1a5 100644 --- a/src/afb-api-dyn.c +++ b/src/afb-api-dyn.c @@ -235,7 +235,7 @@ static struct afb_api_itf dyn_api_itf = { .describe = describe_cb }; -int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int (*preinit)(void*, struct afb_dynapi*), void *closure) +int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi*), void *closure) { int rc; struct afb_api_dyn *dynapi; @@ -266,7 +266,7 @@ int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *inf /* records the binding */ afb_api.closure = dynapi; afb_api.itf = &dyn_api_itf; - afb_api.group = NULL; + afb_api.group = noconcurrency ? dynapi : NULL; if (afb_apiset_add(apiset, afb_export_apiname(dynapi->export), afb_api) < 0) { ERROR("dynamic api %s can't be registered to set %s, ABORTING it!", afb_export_apiname(dynapi->export), diff --git a/src/afb-api-dyn.h b/src/afb-api-dyn.h index 35464c6e..21626eed 100644 --- a/src/afb-api-dyn.h +++ b/src/afb-api-dyn.h @@ -40,6 +40,7 @@ extern int afb_api_dyn_add( struct afb_apiset *apiset, const char *name, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi*), void *closure); diff --git a/src/afb-context.c b/src/afb-context.c index 759ee90e..c2649a48 100644 --- a/src/afb-context.c +++ b/src/afb-context.c @@ -63,7 +63,7 @@ int afb_context_connect(struct afb_context *context, const char *uuid, const cha int created; struct afb_session *session; - session = afb_session_get (uuid, &created); + session = afb_session_get (uuid, AFB_SESSION_TIMEOUT_DEFAULT, &created); if (session == NULL) return -1; init_context(context, session, token); diff --git a/src/afb-evt.h b/src/afb-evt.h index 2d888fa1..901bfbb2 100644 --- a/src/afb-evt.h +++ b/src/afb-evt.h @@ -18,6 +18,7 @@ #pragma once struct afb_event; +struct afb_eventid; struct afb_evtid; struct afb_session; struct json_object; diff --git a/src/afb-export.c b/src/afb-export.c index 19aab0ce..304395ae 100644 --- a/src/afb-export.c +++ b/src/afb-export.c @@ -179,7 +179,7 @@ static struct afb_eventid *eventid_make_cb(void *closure, const char *name) static struct afb_event event_make_cb(void *closure, const char *name) { struct afb_eventid *eventid = eventid_make_cb(closure, name); - return (struct afb_event){ .itf = eventid ? eventid->itf : NULL, .closure = eventid }; + return afb_evt_event_from_evtid(afb_evt_eventid_to_evtid(eventid)); } static int event_broadcast_cb(void *closure, const char *name, struct json_object *object) @@ -255,11 +255,12 @@ static int api_new_api_cb( void *closure, const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure) { struct afb_export *export = closure; - return afb_api_dyn_add(export->apiset, api, info, preinit, preinit_closure); + return afb_api_dyn_add(export->apiset, api, info, noconcurrency, preinit, preinit_closure); } /********************************************** @@ -376,11 +377,12 @@ static int hooked_api_new_api_cb( void *closure, const char *api, const char *info, + int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure) { /* TODO */ - return api_new_api_cb(closure, api, info, preinit, preinit_closure); + return api_new_api_cb(closure, api, info, noconcurrency, preinit, preinit_closure); } /********************************************** * vectors @@ -1070,7 +1072,7 @@ static struct afb_export *create(struct afb_apiset *apiset, const char *apiname, /* session shared with other exports */ if (common_session == NULL) { - common_session = afb_session_create (NULL, 0); + common_session = afb_session_create (0); if (common_session == NULL) return NULL; } @@ -1179,7 +1181,7 @@ struct afb_binding_interface_v1 *afb_export_get_interface_v1(struct afb_export * int afb_export_unshare_session(struct afb_export *export) { if (export->session == common_session) { - export->session = afb_session_create (NULL, 0); + export->session = afb_session_create (0); if (export->session) afb_session_unref(common_session); else { diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 3c8922cd..ce7d75d3 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -37,6 +37,7 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" +#include "jobs.h" struct afb_proto_ws; @@ -190,6 +191,27 @@ struct afb_proto_ws void (*on_hangup)(void *closure); }; +/******************* streaming objects **********************************/ + +#define WRITEBUF_COUNT_MAX 32 +struct writebuf +{ + struct iovec iovec[WRITEBUF_COUNT_MAX]; + uint32_t uints[WRITEBUF_COUNT_MAX]; + int count; +}; + +struct readbuf +{ + char *base, *head, *end; +}; + +struct binary +{ + struct afb_proto_ws *protows; + struct readbuf rb; +}; + /******************* common useful tools **********************************/ /** @@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr) /******************* serialisation part **********************************/ -struct readbuf -{ - char *base, *head, *end; -}; - -#define WRITEBUF_COUNT_MAX 32 -struct writebuf -{ - struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; -}; - static char *readbuf_get(struct readbuf *rb, uint32_t length) { char *before = rb->head; @@ -343,12 +352,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, info ?: "") && writebuf_object(&wb, obj)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -362,12 +374,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, status) && writebuf_string(&wb, info ? : "")) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -391,7 +406,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c sc->callback = callback; sc->closure = cb_closure; - pthread_mutex_unlock(&protows->mutex); + pthread_mutex_lock(&protows->mutex); sc->subcallid = ptr2id(sc); do { sc->subcallid++; @@ -409,7 +424,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c && writebuf_string(&wb, api) && writebuf_string(&wb, verb) && writebuf_object(&wb, args)) { + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -424,12 +441,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -443,12 +463,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -461,7 +484,7 @@ success: /******************* client part **********************************/ /* search a memorized call */ -static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid) { struct client_call *call; @@ -472,11 +495,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint return call; } +static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid) +{ + struct client_call *result; + + pthread_mutex_lock(&protows->mutex); + result = client_call_search_locked(protows, callid); + pthread_mutex_unlock(&protows->mutex); + return result; +} + /* free and release the memorizing call */ static void client_call_destroy(struct client_call *call) { struct client_call **prv; + struct afb_proto_ws *protows = call->protows; + pthread_mutex_lock(&protows->mutex); prv = &call->protows->calls; while (*prv != NULL) { if (*prv == call) { @@ -485,6 +520,7 @@ static void client_call_destroy(struct client_call *call) } prv = &(*prv)->next; } + pthread_mutex_unlock(&protows->mutex); free(call); } @@ -505,7 +541,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, } /* get the call */ - *call = client_call_search(protows, callid); + *call = client_call_search_unlocked(protows, callid); if (*call == NULL) { return 0; } @@ -600,6 +636,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r if (!client_msg_call_get(protows, rb, &call)) return; + if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) { protows->client_itf->on_reply_fail(protows->closure, call->request, status, info); @@ -614,12 +651,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc { struct writebuf wb = { .count = 0 }; char ie = status < 0; + int rc; - return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) + if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) && writebuf_uint32(&wb, subcallid) && writebuf_char(&wb, ie) - && writebuf_object(&wb, object) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + && writebuf_object(&wb, object)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } /* callback for subcall reply */ @@ -682,11 +726,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * struct json_object *object; if (readbuf_uint32(rb, &descid)) { + pthread_mutex_lock(&protows->mutex); prv = &protows->describes; while ((desc = *prv) && desc->descid != descid) prv = &desc->next; - if (desc) { + if (!desc) + pthread_mutex_unlock(&protows->mutex); + else { *prv = desc->next; + pthread_mutex_unlock(&protows->mutex); if (!readbuf_object(rb, &object)) object = NULL; desc->callback(desc->closure, object); @@ -696,56 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } /* callback when receiving binary data */ -static void client_on_binary(void *closure, char *data, size_t size) +static void client_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - pthread_mutex_lock(&protows->mutex); - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(protows, &rb); + client_on_reply_success(binary->protows, &binary->rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(protows, &rb); + client_on_reply_fail(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - client_on_event_broadcast(protows, &rb); + client_on_event_broadcast(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - client_on_event_create(protows, &rb); + client_on_event_create(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_DEL: /* removes the event */ - client_on_event_remove(protows, &rb); + client_on_event_remove(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - client_on_event_push(protows, &rb); + client_on_event_push(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - client_on_event_subscribe(protows, &rb); + client_on_event_subscribe(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - client_on_event_unsubscribe(protows, &rb); + client_on_event_unsubscribe(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(protows, &rb); + client_on_subcall(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIPTION: /* description */ - client_on_description(protows, &rb); + client_on_description(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } - pthread_mutex_unlock(&protows->mutex); } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +/* callback when receiving binary data */ +static void client_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, client_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } int afb_proto_ws_client_call( @@ -771,11 +836,12 @@ int afb_proto_ws_client_call( /* init call data */ pthread_mutex_lock(&protows->mutex); call->callid = ptr2id(call); - while(client_call_search(protows, call->callid) != NULL) + while(client_call_search_locked(protows, call->callid) != NULL) call->callid++; call->protows = protows; call->next = protows->calls; protows->calls = call; + pthread_mutex_unlock(&protows->mutex); /* creates the call message */ if (!writebuf_char(&wb, CHAR_FOR_CALL) @@ -788,7 +854,9 @@ int afb_proto_ws_client_call( } /* send */ + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto end; @@ -797,7 +865,6 @@ int afb_proto_ws_client_call( clean: client_call_destroy(call); end: - pthread_mutex_unlock(&protows->mutex); return rc; } @@ -830,15 +897,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( desc->protows = protows; desc->next = protows->describes; protows->describes = desc; - pthread_mutex_unlock(&protows->mutex); /* send */ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) && writebuf_uint32(&wb, desc->descid) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) + && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) { + pthread_mutex_unlock(&protows->mutex); return 0; + } - pthread_mutex_lock(&protows->mutex); d = protows->describes; if (d == desc) protows->describes = desc->next; @@ -848,8 +915,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( if (d) d->next = desc->next; } - free(desc); pthread_mutex_unlock(&protows->mutex); + free(desc); error: /* TODO? callback(closure, NULL); */ return -1; @@ -931,18 +998,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { + int rc; struct writebuf wb = { .count = 0 }; - return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION) - && writebuf_uint32(&wb, descid) - && writebuf_object(&wb, descobj) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION) + && writebuf_uint32(&wb, descid) + && writebuf_object(&wb, descobj)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description) { int rc = server_send_description(describe->protows, describe->descid, description); - afb_proto_ws_addref(describe->protows); + afb_proto_ws_unref(describe->protows); free(describe); return rc; } @@ -971,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } /* callback when receiving binary data */ -static void server_on_binary(void *closure, char *data, size_t size) +static void server_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_CALL: - server_on_call(protows, &rb); + server_on_call(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(protows, &rb); + server_on_subcall_reply(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIBE: - server_on_describe(protows, &rb); + server_on_describe(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +static void server_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, server_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } /******************* server part: manage events **********************************/ @@ -1005,12 +1097,19 @@ static void server_on_binary(void *closure, char *data, size_t size) static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) { struct writebuf wb = { .count = 0 }; + int rc; - return -!(writebuf_char(&wb, order) - && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) - && writebuf_string(&wb, event_name) - && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data)) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, order) + && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) + && writebuf_string(&wb, event_name) + && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) @@ -1059,6 +1158,7 @@ static void on_hangup(void *closure) } if (protows->fd >= 0) { + close(protows->fd); protows->fd = -1; if (protows->on_hangup) protows->on_hangup(protows->closure); diff --git a/src/afb-session.c b/src/afb-session.c index 61bce09a..6b6ad634 100644 --- a/src/afb-session.c +++ b/src/afb-session.c @@ -31,10 +31,15 @@ #include "afb-session.h" #include "verbose.h" -#define COOKEYCOUNT 8 -#define COOKEYMASK (COOKEYCOUNT - 1) +#define SIZEUUID 37 +#define HEADCOUNT 16 +#define COOKEYCOUNT 8 +#define COOKEYMASK (COOKEYCOUNT - 1) -#define NOW (time(NULL)) +#define _MAXEXP_ ((time_t)(~(time_t)0)) +#define _MAXEXP2_ ((time_t)((((unsigned long long)_MAXEXP_) >> 1))) +#define MAX_EXPIRATION (_MAXEXP_ >= 0 ? _MAXEXP_ : _MAXEXP2_) +#define NOW (time(NULL)) struct cookie { @@ -46,40 +51,30 @@ struct cookie struct afb_session { + struct afb_session *next; /* link to the next */ unsigned refcount; int timeout; - time_t expiration; // expiration time of the token - time_t access; + time_t expiration; // expiration time of the token pthread_mutex_t mutex; - char uuid[37]; // long term authentication of remote client - char token[37]; // short term authentication of remote client struct cookie *cookies[COOKEYCOUNT]; + char autoclose; + char idx; + char uuid[SIZEUUID]; // long term authentication of remote client + char token[SIZEUUID]; // short term authentication of remote client }; // Session UUID are store in a simple array [for 10 sessions this should be enough] static struct { - pthread_mutex_t mutex; // declare a mutex to protect hash table - struct afb_session **store; // sessions store - int count; // current number of sessions + pthread_mutex_t mutex; // declare a mutex to protect hash table + struct afb_session *heads[HEADCOUNT]; // sessions + int count; // current number of sessions int max; int timeout; - char initok[37]; + char initok[SIZEUUID]; } sessions; -/** - * Get the index of the 'key' in the cookies array. - * @param key the key to scan - * @return the index of the list for key within cookies - */ -static int cookeyidx(const void *key) -{ - intptr_t x = (intptr_t)key; - unsigned r = (unsigned)((x >> 5) ^ (x >> 15)); - return r & COOKEYMASK; -} - /* generate a uuid */ -static void new_uuid(char uuid[37]) +static void new_uuid(char uuid[SIZEUUID]) { uuid_t newuuid; uuid_generate(newuuid); @@ -97,40 +92,51 @@ static inline void unlock(struct afb_session *session) } // Free context [XXXX Should be protected again memory abort XXXX] -static void free_data (struct afb_session *session) +static void close_session(struct afb_session *session) { int idx; - struct cookie *cookie, *next; + struct cookie *cookie; - // free cookies + /* free cookies */ for (idx = 0 ; idx < COOKEYCOUNT ; idx++) { - cookie = session->cookies[idx]; - session->cookies[idx] = NULL; - while (cookie != NULL) { - next = cookie->next; + while ((cookie = session->cookies[idx])) { + session->cookies[idx] = cookie->next; if (cookie->freecb != NULL) cookie->freecb(cookie->value); free(cookie); - cookie = next; } } } +/* tiny hash function inspired from pearson */ +static int pearson4(const char *text) +{ + static uint8_t T[16] = { + 4, 1, 6, 0, 9, 14, 11, 5, + 2, 3, 12, 15, 10, 7, 8, 13 + }; + uint8_t r, c; + + for (r = 0; (c = (uint8_t)*text) ; text++) { + r = T[r ^ (15 & c)]; + r = T[r ^ (c >> 4)]; + } + return r; // % HEADCOUNT; +} + // Create a new store in RAM, not that is too small it will be automatically extended void afb_session_init (int max_session_count, int timeout, const char *initok) { - // let's create as store as hashtable does not have any - sessions.store = calloc (1 + (unsigned)max_session_count, sizeof *sessions.store); pthread_mutex_init(&sessions.mutex, NULL); sessions.max = max_session_count; sessions.timeout = timeout; if (initok == NULL) /* without token, a secret is made to forbid creation of sessions */ new_uuid(sessions.initok); - else if (strlen(initok) < sizeof(sessions.store[0]->token)) + else if (strlen(initok) < sizeof sessions.initok) strcpy(sessions.initok, initok); else { - ERROR("initial token '%s' too long (max length 36)", initok); + ERROR("initial token '%s' too long (max length %d)", initok, ((int)(sizeof sessions.initok)) - 1); exit(1); } } @@ -140,196 +146,205 @@ const char *afb_session_initial_token() return sessions.initok; } -static struct afb_session *search (const char* uuid) +static struct afb_session *search (const char* uuid, int idx) { - int idx; struct afb_session *session; - assert (uuid != NULL); - - pthread_mutex_lock(&sessions.mutex); - - for (idx=0; idx < sessions.max; idx++) { - session = sessions.store[idx]; - if (session && (0 == strcmp (uuid, session->uuid))) - goto found; - } - session = NULL; + session = sessions.heads[idx]; + while (session && strcmp(uuid, session->uuid)) + session = session->next; -found: - pthread_mutex_unlock(&sessions.mutex); return session; } -static int destroy (struct afb_session *session) +static void destroy (struct afb_session *session) { - int idx; - int status; + struct afb_session **prv; assert (session != NULL); + close_session(session); pthread_mutex_lock(&sessions.mutex); - - for (idx=0; idx < sessions.max; idx++) { - if (sessions.store[idx] == session) { - sessions.store[idx] = NULL; + prv = &sessions.heads[(int)session->idx]; + while (*prv) + if (*prv != session) + prv = &((*prv)->next); + else { + *prv = session->next; sessions.count--; - status = 1; - goto deleted; + pthread_mutex_destroy(&session->mutex); + free(session); + break; } - } - status = 0; -deleted: pthread_mutex_unlock(&sessions.mutex); - return status; } -static int add (struct afb_session *session) +// Loop on every entry and remove old context sessions.hash +static time_t cleanup () { + struct afb_session *session, *next; int idx; - int status; - - assert (session != NULL); - - pthread_mutex_lock(&sessions.mutex); + time_t now; - for (idx=0; idx < sessions.max; idx++) { - if (NULL == sessions.store[idx]) { - sessions.store[idx] = session; - sessions.count++; - status = 1; - goto added; + // Loop on Sessions Table and remove anything that is older than timeout + now = NOW; + for (idx = 0 ; idx < HEADCOUNT; idx++) { + session = sessions.heads[idx]; + while (session) { + next = session->next; + if (session->expiration < now) + afb_session_close(session); + session = next; } } - status = 0; -added: - pthread_mutex_unlock(&sessions.mutex); - return status; + return now; } -// Check if context timeout or not -static int is_expired (struct afb_session *ctx, time_t now) +static void update_timeout(struct afb_session *session, time_t now, int timeout) { - assert (ctx != NULL); - return ctx->expiration < now; + time_t expiration; + + /* compute expiration */ + if (timeout == AFB_SESSION_TIMEOUT_INFINITE) + expiration = MAX_EXPIRATION; + else { + if (timeout == AFB_SESSION_TIMEOUT_DEFAULT) + expiration = now + sessions.timeout; + else + expiration = now + timeout; + if (expiration < 0) + expiration = MAX_EXPIRATION; + } + + /* record the values */ + session->timeout = timeout; + session->expiration = expiration; } -// Check if context is active or not -static int is_active (struct afb_session *ctx, time_t now) +static void update_expiration(struct afb_session *session, time_t now) { - assert (ctx != NULL); - return ctx->uuid[0] != 0 && ctx->expiration >= now; + update_timeout(session, now, session->timeout); } -// Loop on every entry and remove old context sessions.hash -static void cleanup (time_t now) +static struct afb_session *add_session (const char *uuid, int timeout, time_t now, int idx) { - struct afb_session *ctx; - long idx; + struct afb_session *session; - // Loop on Sessions Table and remove anything that is older than timeout - for (idx=0; idx < sessions.max; idx++) { - ctx = sessions.store[idx]; - if (ctx != NULL && is_expired(ctx, now)) { - afb_session_close (ctx); - } + /* check arguments */ + if (!AFB_SESSION_TIMEOUT_IS_VALID(timeout) + || (uuid && strlen(uuid) >= sizeof session->uuid)) { + errno = EINVAL; + return NULL; } -} -static struct afb_session *make_session (const char *uuid, int timeout, time_t now) -{ - struct afb_session *session; + /* check session count */ + if (sessions.count >= sessions.max) { + errno = EBUSY; + return NULL; + } /* allocates a new one */ session = calloc(1, sizeof *session); if (session == NULL) { errno = ENOMEM; - goto error; - } - pthread_mutex_init(&session->mutex, NULL); - - /* generate the uuid */ - if (uuid == NULL) { - new_uuid(session->uuid); - } else { - if (strlen(uuid) >= sizeof session->uuid) { - errno = EINVAL; - goto error2; - } - strcpy(session->uuid, uuid); + return NULL; } - /* init the token */ + /* initialize */ + pthread_mutex_init(&session->mutex, NULL); + session->refcount = 1; + strcpy(session->uuid, uuid); strcpy(session->token, sessions.initok); - session->timeout = timeout; - if (timeout != 0) - session->expiration = now + timeout; - else { - session->expiration = (time_t)(~(time_t)0); - if (session->expiration < 0) - session->expiration = (time_t)(((unsigned long long)session->expiration) >> 1); - } - if (!add (session)) { - errno = ENOMEM; - goto error2; - } + update_timeout(session, now, timeout); + + /* link */ + session->idx = (char)idx; + session->next = sessions.heads[idx]; + sessions.heads[idx] = session; + sessions.count++; - session->access = now; - session->refcount = 1; return session; +} + +/* create a new session for the given timeout */ +static struct afb_session *new_session (int timeout, time_t now) +{ + int idx; + char uuid[SIZEUUID]; -error2: - free(session); -error: - return NULL; + do { + new_uuid(uuid); + idx = pearson4(uuid); + } while(search(uuid, idx)); + return add_session(uuid, timeout, now, idx); } -struct afb_session *afb_session_create (const char *uuid, int timeout) +/* Creates a new session with 'timeout' */ +struct afb_session *afb_session_create (int timeout) { time_t now; + struct afb_session *session; /* cleaning */ - now = NOW; - cleanup (now); + pthread_mutex_lock(&sessions.mutex); + now = cleanup(); + session = new_session(timeout, now); + pthread_mutex_unlock(&sessions.mutex); - /* search for an existing one not too old */ - if (uuid != NULL && search(uuid) != NULL) { - errno = EEXIST; - return NULL; - } + return session; +} + +/* Searchs the session of 'uuid' */ +struct afb_session *afb_session_search (const char *uuid) +{ + struct afb_session *session; + + /* cleaning */ + pthread_mutex_lock(&sessions.mutex); + cleanup(); + session = search(uuid, pearson4(uuid)); + if (session) + __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED); + pthread_mutex_unlock(&sessions.mutex); + return session; - return make_session(uuid, timeout, now); } -// This function will return exiting session or newly created session -struct afb_session *afb_session_get (const char *uuid, int *created) +/* This function will return exiting session or newly created session */ +struct afb_session *afb_session_get (const char *uuid, int timeout, int *created) { + int idx; struct afb_session *session; time_t now; /* cleaning */ - now = NOW; - cleanup (now); + pthread_mutex_lock(&sessions.mutex); + now = cleanup(); /* search for an existing one not too old */ - if (uuid != NULL) { - session = search(uuid); - if (!created) - return session; - if (session != NULL) { - *created = 0; - session->access = now; - session->refcount++; + if (!uuid) + session = new_session(timeout, now); + else { + idx = pearson4(uuid); + session = search(uuid, idx); + if (session) { + __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED); + pthread_mutex_unlock(&sessions.mutex); + if (created) + *created = 0; return session; } + session = add_session (uuid, timeout, now, idx); } + pthread_mutex_unlock(&sessions.mutex); if (created) - *created = 1; + *created = !!session; - return make_session(uuid, sessions.timeout, now); + return session; } +/* increase the use count on the session */ struct afb_session *afb_session_addref(struct afb_session *session) { if (session != NULL) @@ -337,32 +352,57 @@ struct afb_session *afb_session_addref(struct afb_session *session) return session; } +/* decrease the use count of the session */ void afb_session_unref(struct afb_session *session) { if (session != NULL) { assert(session->refcount != 0); if (!__atomic_sub_fetch(&session->refcount, 1, __ATOMIC_RELAXED)) { - if (session->uuid[0] == 0) { + pthread_mutex_lock(&session->mutex); + if (session->autoclose || session->uuid[0] == 0) destroy (session); - pthread_mutex_destroy(&session->mutex); - free(session); - } + else + pthread_mutex_unlock(&session->mutex); } } } -// Free Client Session Context +// close Client Session Context void afb_session_close (struct afb_session *session) { assert(session != NULL); + pthread_mutex_lock(&session->mutex); if (session->uuid[0] != 0) { session->uuid[0] = 0; - free_data (session); - if (session->refcount == 0) { + if (session->refcount) + close_session(session); + else { destroy (session); - free(session); + return; } } + pthread_mutex_unlock(&session->mutex); +} + +/* set the autoclose flag */ +void afb_session_set_autoclose(struct afb_session *session, int autoclose) +{ + assert(session != NULL); + session->autoclose = (char)!!autoclose; +} + +// is the session active? +int afb_session_is_active (struct afb_session *session) +{ + assert(session != NULL); + return !!session->uuid[0]; +} + +// is the session closed? +int afb_session_is_closed (struct afb_session *session) +{ + assert(session != NULL); + return !session->uuid[0]; } // Sample Generic Ping Debug API @@ -371,8 +411,10 @@ int afb_session_check_token (struct afb_session *session, const char *token) assert(session != NULL); assert(token != NULL); - // compare current token with previous one - if (!is_active (session, NOW)) + if (!session->uuid[0]) + return 0; + + if (session->expiration < NOW) return 0; if (session->token[0] && strcmp (token, session->token) != 0) @@ -390,111 +432,135 @@ void afb_session_new_token (struct afb_session *session) new_uuid(session->token); // keep track of time for session timeout and further clean up - if (session->timeout != 0) - session->expiration = NOW + session->timeout; + update_expiration(session, NOW); } +/* Returns the uuid of 'session' */ const char *afb_session_uuid (struct afb_session *session) { assert(session != NULL); return session->uuid; } +/* Returns the token of 'session' */ const char *afb_session_token (struct afb_session *session) { assert(session != NULL); return session->token; } -static struct cookie *cookie_search(struct afb_session *session, const void *key, int *idx) -{ - struct cookie *cookie; - - cookie = session->cookies[*idx = cookeyidx(key)]; - while(cookie != NULL && cookie->key != key) - cookie = cookie->next; - return cookie; -} - -static struct cookie *cookie_add(struct afb_session *session, int idx, const void *key, void *value, void (*freecb)(void*)) +/** + * Get the index of the 'key' in the cookies array. + * @param key the key to scan + * @return the index of the list for key within cookies + */ +static int cookeyidx(const void *key) { - struct cookie *cookie; - - cookie = malloc(sizeof *cookie); - if (!cookie) - errno = ENOMEM; - else { - cookie->key = key; - cookie->value = value; - cookie->freecb = freecb; - cookie->next = session->cookies[idx]; - session->cookies[idx] = cookie; - } - return cookie; + intptr_t x = (intptr_t)key; + unsigned r = (unsigned)((x >> 5) ^ (x >> 15)); + return r & COOKEYMASK; } +/** + * Set, get, replace, remove a cookie of 'key' for the 'session' + * + * The behaviour of this function depends on its parameters: + * + * @param session the session + * @param key the key of the cookie + * @param makecb the creation function or NULL + * @param freecb the release function or NULL + * @param closure an argument for makecb or the value if makecb==NULL + * @param replace a boolean enforcing replecement of the previous value + * + * @return the value of the cookie + * + * The 'key' is a pointer and compared as pointers. + * + * For getting the current value of the cookie: + * + * afb_session_cookie(session, key, NULL, NULL, NULL, 0) + * + * For storing the value of the cookie + * + * afb_session_cookie(session, key, NULL, NULL, value, 1) + */ void *afb_session_cookie(struct afb_session *session, const void *key, void *(*makecb)(void *closure), void (*freecb)(void *item), void *closure, int replace) { int idx; void *value; - struct cookie *cookie; + struct cookie *cookie, **prv; + /* get key hashed index */ + idx = cookeyidx(key); + + /* lock session and search for the cookie of 'key' */ lock(session); - cookie = cookie_search(session, key, &idx); - if (cookie) { - if (!replace) - value = cookie->value; - else { + prv = &session->cookies[idx]; + for (;;) { + cookie = *prv; + if (!cookie) { + /* 'key' not found, create value using 'closure' and 'makecb' */ value = makecb ? makecb(closure) : closure; - if (cookie->value != value && cookie->freecb) - cookie->freecb(cookie->value); - cookie->value = value; - cookie->freecb = freecb; - } - } else { - value = makecb ? makecb(closure) : closure; - if (replace || makecb || freecb) { - cookie = cookie_add(session, idx, key, value, freecb); - if (!cookie) { - if (makecb && freecb) - freecb(value); - value = NULL; + /* store the the only if it has some meaning */ + if (replace || makecb || freecb) { + cookie = malloc(sizeof *cookie); + if (!cookie) { + errno = ENOMEM; + /* calling freecb if there is no makecb may have issue */ + if (makecb && freecb) + freecb(value); + value = NULL; + } else { + cookie->key = key; + cookie->value = value; + cookie->freecb = freecb; + cookie->next = NULL; + *prv = cookie; + } + } + break; + } else if (cookie->key == key) { + /* cookie of key found */ + if (!replace) + /* not replacing, get the value */ + value = cookie->value; + else { + /* create value using 'closure' and 'makecb' */ + value = makecb ? makecb(closure) : closure; + + /* free previous value is needed */ + if (cookie->value != value && cookie->freecb) + cookie->freecb(cookie->value); + + /* store the value and its releaser */ + cookie->value = value; + cookie->freecb = freecb; + + /* but if both are NULL drop the cookie */ + if (!value && !freecb) { + *prv = cookie->next; + free(cookie); + } } + break; + } else { + prv = &(cookie->next); } } + + /* unlock the session and return the value */ unlock(session); return value; } void *afb_session_get_cookie(struct afb_session *session, const void *key) { - int idx; - void *value; - struct cookie *cookie; - - lock(session); - cookie = cookie_search(session, key, &idx); - value = cookie ? cookie->value : NULL; - unlock(session); - return value; + return afb_session_cookie(session, key, NULL, NULL, NULL, 0); } int afb_session_set_cookie(struct afb_session *session, const void *key, void *value, void (*freecb)(void*)) { - int idx; - struct cookie *cookie; - - lock(session); - cookie = cookie_search(session, key, &idx); - if (!cookie) - cookie = cookie_add(session, idx, key, value, freecb); - else { - if (cookie->value != value && cookie->freecb) - cookie->freecb(cookie->value); - cookie->value = value; - cookie->freecb = freecb; - } - unlock(session); - return -!cookie; + return -(value != afb_session_cookie(session, key, NULL, freecb, value, 1)); } diff --git a/src/afb-session.h b/src/afb-session.h index b5dc3944..d79ec414 100644 --- a/src/afb-session.h +++ b/src/afb-session.h @@ -19,17 +19,25 @@ struct afb_session; +#define AFB_SESSION_TIMEOUT_INFINITE -1 +#define AFB_SESSION_TIMEOUT_DEFAULT -2 +#define AFB_SESSION_TIMEOUT_IS_VALID(x) ((x) >= AFB_SESSION_TIMEOUT_DEFAULT) + extern void afb_session_init(int max_session_count, int timeout, const char *initok); extern const char *afb_session_initial_token(); -extern struct afb_session *afb_session_create (const char *uuid, int timeout); -extern struct afb_session *afb_session_get (const char *uuid, int *created); +extern struct afb_session *afb_session_create (int timeout); +extern struct afb_session *afb_session_search (const char *uuid); +extern struct afb_session *afb_session_get (const char *uuid, int timeout, int *created); extern const char *afb_session_uuid (struct afb_session *session); extern struct afb_session *afb_session_addref(struct afb_session *session); extern void afb_session_unref(struct afb_session *session); +extern void afb_session_set_autoclose(struct afb_session *session, int autoclose); extern void afb_session_close(struct afb_session *session); +extern int afb_session_is_active (struct afb_session *session); +extern int afb_session_is_closed (struct afb_session *session); extern int afb_session_check_token(struct afb_session *session, const char *token); extern void afb_session_new_token(struct afb_session *session); diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 693a0d0c..740a8575 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -125,6 +125,15 @@ struct server_describe struct afb_proto_ws_describe *describe; }; +/* + * structure for recording sessions + */ +struct server_session +{ + struct server_session *next; + struct afb_session *session; +}; + /******************* stub description for client or servers ******************/ struct afb_stub_ws @@ -147,6 +156,9 @@ struct afb_stub_ws /* credentials (server side) */ struct afb_cred *cred; + /* sessions (server side) */ + struct server_session *sessions; + /* apiset */ struct afb_apiset *apiset; @@ -460,6 +472,46 @@ static void on_subcall(void *closure, struct afb_proto_ws_subcall *subcall, void /*****************************************************/ +static void record_session(struct afb_stub_ws *stubws, struct afb_session *session) +{ + struct server_session *s, **prv; + + /* search */ + prv = &stubws->sessions; + while ((s = *prv)) { + if (s->session == session) + return; + if (afb_session_is_active(s->session)) + prv = &s->next; + else { + *prv = s->next; + afb_session_addref(s->session); + free(s); + } + } + + /* create */ + s = malloc(sizeof *s); + if (s) { + s->session = afb_session_addref(session); + s->next = stubws->sessions; + stubws->sessions = s; + } +} + +static void release_sessions(struct afb_stub_ws *stubws) +{ + struct server_session *s; + + while((s = stubws->sessions)) { + stubws->sessions = s->next; + afb_session_unref(s->session); + free(s); + } +} + +/*****************************************************/ + static void on_call(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid) { struct afb_stub_ws *stubws = closure; @@ -480,6 +532,9 @@ static void on_call(void *closure, struct afb_proto_ws_call *call, const char *v if (afb_context_connect(&wreq->xreq.context, sessionid, NULL) < 0) goto unconnected; wreq->xreq.context.validated = 1; + record_session(stubws, wreq->xreq.context.session); + if (wreq->xreq.context.created) + afb_session_set_autoclose(wreq->xreq.context.session, 1); /* makes the call */ wreq->xreq.cred = afb_cred_addref(stubws->cred); @@ -598,6 +653,8 @@ static void on_hangup(void *closure) if (stubws->on_hangup) stubws->on_hangup(stubws); + + release_sessions(stubws); } /*****************************************************/ @@ -651,6 +708,7 @@ void afb_stub_ws_unref(struct afb_stub_ws *stubws) if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) { drop_all_events(stubws); afb_evt_listener_unref(stubws->listener); + release_sessions(stubws); afb_proto_ws_unref(stubws->proto); afb_cred_unref(stubws->cred); afb_apiset_unref(stubws->apiset); diff --git a/src/afb-trace.c b/src/afb-trace.c index fb077956..021ab5af 100644 --- a/src/afb-trace.c +++ b/src/afb-trace.c @@ -76,10 +76,9 @@ struct event { }; /* struct for sessions */ -struct session { - struct session *next; /* link to the next session */ - struct afb_session *session; /* the session */ - struct afb_trace *trace; /* the tracer */ +struct cookie { + struct afb_session *session; /* the session */ + struct afb_trace *trace; /* the tracer */ }; /* struct for recording hooks */ @@ -88,7 +87,7 @@ struct hook { void *handler; /* the handler of the hook */ struct event *event; /* the associated event */ struct tag *tag; /* the associated tag */ - struct session *session; /* the associated session */ + struct afb_session *session; /* the associated session */ }; /* types of hooks */ @@ -111,7 +110,6 @@ struct afb_trace struct afb_session *bound; /* bound to session */ struct event *events; /* list of events */ struct tag *tags; /* list of tags */ - struct session *sessions; /* list of tags */ struct hook *hooks[Trace_Type_Count]; /* hooks */ }; @@ -975,7 +973,7 @@ abstracting[Trace_Type_Count] = /*******************************************************************************/ /* drop hooks of 'trace' matching 'tag' and 'event' and 'session' */ -static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct session *session) +static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct afb_session *session) { int i; struct hook *hook, **prev; @@ -1004,24 +1002,7 @@ static void trace_cleanup(struct afb_trace *trace) struct hook *hook; struct tag *tag, **ptag; struct event *event, **pevent; - struct session *session, **psession; - /* clean sessions */ - psession = &trace->sessions; - while ((session = *psession)) { - /* search for session */ - for (hook = NULL, i = 0 ; !hook && i < Trace_Type_Count ; i++) - for (hook = trace->hooks[i] ; hook && hook->session != session ; hook = hook->next); - /* keep or free whether used or not */ - if (hook) - psession = &session->next; - else { - *psession = session->next; - if (__atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED)) - afb_session_set_cookie(session->session, session, NULL, NULL); - free(session); - } - } /* clean tags */ ptag = &trace->tags; while ((tag = *ptag)) { @@ -1053,19 +1034,6 @@ static void trace_cleanup(struct afb_trace *trace) } } -/* callback at end of traced session */ -static void free_session_cookie(void *cookie) -{ - struct session *session = cookie; - struct afb_trace *trace = __atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED); - if (trace) { - pthread_mutex_lock(&trace->mutex); - trace_unhook(trace, NULL, NULL, session); - trace_cleanup(trace); - pthread_mutex_unlock(&trace->mutex); - } -} - /* * Get the tag of 'name' within 'trace'. * If 'alloc' isn't zero, create the tag and add it. @@ -1123,41 +1091,48 @@ static struct event *trace_get_event(struct afb_trace *trace, const char *name, } /* - * Get the session of 'value' within 'trace'. - * If 'alloc' isn't zero, create the session and add it. + * called on session closing */ -static struct session *trace_get_session(struct afb_trace *trace, struct afb_session *value, int alloc) -{ - struct session *session; - - /* search the session */ - session = trace->sessions; - while (session && session->session != value) - session = session->next; - - if (!session && alloc) { - session = malloc(sizeof * session); - if (session) { - session->session = value; - session->trace = NULL; - session->next = trace->sessions; - trace->sessions = session; - } - } - return session; +static void session_closed(void *item) +{ + struct cookie *cookie = item; + + pthread_mutex_lock(&cookie->trace->mutex); + trace_unhook(cookie->trace, NULL, NULL, cookie->session); + pthread_mutex_unlock(&cookie->trace->mutex); + free(cookie); +} + +/* + * records the cookie of session for tracking close + */ +static void *session_open(void *closure) +{ + struct cookie *param = closure, *cookie; + cookie = malloc(sizeof *cookie); + if (cookie) + *cookie = *param; + return cookie; } /* * Get the session of 'uuid' within 'trace'. * If 'alloc' isn't zero, create the session and add it. */ -static struct session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc) +static struct afb_session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc) { - struct afb_session *session; - int created; + struct cookie cookie; - session = afb_session_get(uuid, alloc ? &created : NULL); - return session ? trace_get_session(trace, session, alloc) : NULL; + if (!alloc) + cookie.session = afb_session_search(uuid); + else { + cookie.session = afb_session_get(uuid, AFB_SESSION_TIMEOUT_DEFAULT, NULL); + if (cookie.session) { + cookie.trace = trace; + afb_session_cookie(cookie.session, cookie.trace, session_open, session_closed, &cookie, 0); + } + } + return cookie.session; } static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char *event, const char *tag) @@ -1178,13 +1153,8 @@ static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char static void trace_attach_hook(struct afb_trace *trace, struct hook *hook, enum trace_type type) { - struct session *session = hook->session; hook->next = trace->hooks[type]; trace->hooks[type] = hook; - if (session && !session->trace) { - session->trace = trace; - afb_session_set_cookie(session->session, session, session, free_session_cookie); - } } /*******************************************************************************/ @@ -1214,7 +1184,7 @@ struct desc static void addhook(struct desc *desc, enum trace_type type) { struct hook *hook; - struct session *session; + struct afb_session *session; struct afb_session *bind; struct afb_trace *trace = desc->context->trace; @@ -1241,17 +1211,19 @@ static void addhook(struct desc *desc, enum trace_type type) /* create the hook handler */ switch (type) { case Trace_Type_Xreq: - if (desc->session) { + if (!desc->session) + session = afb_session_addref(bind); + else { session = trace_get_session_by_uuid(trace, desc->session, 1); if (!session) { ctxt_error(&desc->context->errors, "allocation of session failed"); free(hook); return; } - bind = session->session; } - hook->handler = afb_hook_create_xreq(desc->api, desc->verb, bind, + hook->handler = afb_hook_create_xreq(desc->api, desc->verb, session, desc->flags[type], &hook_xreq_itf, hook); + afb_session_unref(session); break; case Trace_Type_Ditf: hook->handler = afb_hook_create_ditf(desc->api, desc->flags[type], &hook_ditf_itf, hook); @@ -1443,7 +1415,7 @@ static void drop_session(void *closure, struct json_object *object) { int rc; struct context *context = closure; - struct session *session; + struct afb_session *session; const char *uuid; rc = wrap_json_unpack(object, "s", &uuid); @@ -1453,8 +1425,10 @@ static void drop_session(void *closure, struct json_object *object) session = trace_get_session_by_uuid(context->trace, uuid, 0); if (!session) ctxt_error(&context->errors, "session %s not found", uuid); - else + else { trace_unhook(context->trace, NULL, NULL, session); + afb_session_unref(session); + } } } diff --git a/src/jobs-fake.c b/src/jobs-fake.c new file mode 100644 index 00000000..3c1c2732 --- /dev/null +++ b/src/jobs-fake.c @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2016, 2017 "IoT.bzh" + * Author José Bollo <jose.bollo@iot.bzh> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> +#include <signal.h> +#include <time.h> +#include <sys/syscall.h> +#include <pthread.h> +#include <errno.h> +#include <assert.h> + +#include <systemd/sd-event.h> + +#include "jobs.h" +#include "sig-monitor.h" +#include "verbose.h" + +#include "jobs.h" + +struct jobloop; + +struct job +{ + struct job *next; + const void *group; + int timeout; + void (*callback)(int signum, void* arg); + void *closure; +}; + +static struct job *first, *last; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure) +{ + struct job *j; + + j = malloc(sizeof*j); + if (!j) { + errno = ENOMEM; + return -1; + } + + j->next = 0; + j->group = group; + j->timeout = timeout; + j->callback = callback; + j->closure = closure; + + pthread_mutex_lock(&mutex); + if (first) + last->next = j; + else + first = j; + last = j; + pthread_mutex_unlock(&mutex); + return 0; +} + +static void *thrrun(void *arg) +{ + struct job *j; + + pthread_mutex_lock(&mutex); + j = first; + if (j) + first = j->next; + pthread_mutex_unlock(&mutex); + if (j) { + j->callback(0, j->closure); + free(j); + } + return 0; +} + +int jobs_queue( + const void *group, + int timeout, + void (*callback)(int signum, void* arg), + void *arg) +{ + pthread_t tid; + int rc = add_job(group, timeout, callback, arg); + if (!rc) { + rc = pthread_create(&tid, NULL, thrrun, NULL); + if (rc) + rc = -1; + } + return rc; +} + +#if 0 +int jobs_enter( + const void *group, + int timeout, + void (*callback)(int signum, void *closure, struct jobloop *jobloop), + void *closure) +{ + return 0; +} + +int jobs_leave(struct jobloop *jobloop) +{ + return 0; +} + +int jobs_call( + const void *group, + int timeout, + void (*callback)(int, void*), + void *arg) +{ + return 0; +} + +struct sd_event *jobs_get_sd_event() +{ + struct sd_event *r; + int rc = sd_event_default(&r); + return rc < 0 ? NULL : r; +} + +void jobs_terminate() +{ +} + +int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum)) +{ + start(0); + return 0; +} +#endif @@ -64,8 +64,11 @@ struct events struct events *next; struct sd_event *event; uint64_t timeout; - unsigned used: 1; - unsigned runs: 1; + enum { + Available, + Modifiable, + Locked + } state; }; /** Description of threads */ @@ -74,10 +77,8 @@ struct thread struct thread *next; /**< next thread of the list */ struct thread *upper; /**< upper same thread */ struct job *job; /**< currently processed job */ - struct events *events; /**< currently processed job */ pthread_t tid; /**< the thread id */ unsigned stop: 1; /**< stop requested */ - unsigned lowered: 1; /**< has a lower same thread */ unsigned waits: 1; /**< is waiting? */ }; @@ -109,7 +110,8 @@ static int nevents = 0; /** count of events */ /* list of threads */ static struct thread *threads; -static _Thread_local struct thread *current; +static _Thread_local struct thread *current_thread; +static _Thread_local struct events *current_events; /* queue of pending jobs */ static struct job *first_job; @@ -204,7 +206,7 @@ static inline struct job *job_get() static inline struct events *events_get() { struct events *events = first_events; - while (events && events->used) + while (events && events->state != Available) events = events->next; return events; } @@ -272,9 +274,34 @@ static void job_cancel(int signum, void *arg) */ static void events_call(int signum, void *arg) { + int rc; + struct sd_event *se; struct events *events = arg; - if (!signum) - sd_event_run(events->event, events->timeout); + + if (!signum) { + se = events->event; + rc = sd_event_prepare(se); + if (rc < 0) { + errno = -rc; + ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event)); + } else { + if (rc == 0) { + rc = sd_event_wait(se, events->timeout); + if (rc < 0) { + errno = -rc; + ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event)); + } + } + + if (rc > 0) { + rc = sd_event_dispatch(se); + if (rc < 0) { + errno = -rc; + ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event)); + } + } + } + } } /** @@ -286,7 +313,7 @@ static void events_call(int signum, void *arg) */ static void thread_run(volatile struct thread *me) { - struct thread **prv, *thr; + struct thread **prv; struct job *job; struct events *events; uint64_t evto; @@ -294,22 +321,18 @@ static void thread_run(volatile struct thread *me) /* initialize description of itself and link it in the list */ me->tid = pthread_self(); me->stop = 0; - me->lowered = 0; me->waits = 0; - me->upper = current; - if (current) { - current->lowered = 1; + me->upper = current_thread; + if (current_thread) { evto = EVENT_TIMEOUT_CHILD; - me->events = current->events; } else { started++; sig_monitor_init_timeouts(); evto = EVENT_TIMEOUT_TOP; - me->events = NULL; } me->next = threads; threads = (struct thread*)me; - current = (struct thread*)me; + current_thread = (struct thread*)me; /* loop until stopped */ while (!me->stop) { @@ -326,37 +349,34 @@ static void thread_run(volatile struct thread *me) sig_monitor(job->timeout, job->callback, job->arg); pthread_mutex_lock(&mutex); - /* release the run job */ - job_release(job); - /* release event if any */ - events = me->events; - if (events) { - events->used = 0; - me->events = NULL; + events = current_events; + if (events && events->state == Modifiable) { + current_events = NULL; + events->state = Available; } + + /* release the run job */ + job_release(job); } else { /* no job, check events */ - events = me->events; - if (!events || events->runs) + events = current_events; + if (!events) events = events_get(); + else if (events->state == Locked) { + events = 0; + WARNING("Loosing an event loop because reentering"); + } if (events) { /* run the events */ - events->used = 1; - events->runs = 1; + events->state = Locked; events->timeout = evto; - me->events = events; + current_events = events; pthread_mutex_unlock(&mutex); sig_monitor(0, events_call, events); pthread_mutex_lock(&mutex); - events->used = 0; - events->runs = 0; - me->events = NULL; - thr = me->upper; - while (thr && thr->events == events) { - thr->events = NULL; - thr = thr->upper; - } + current_events = NULL; + events->state = Available; } else { /* no job and not events */ waiting++; @@ -373,10 +393,8 @@ static void thread_run(volatile struct thread *me) while (*prv != me) prv = &(*prv)->next; *prv = me->next; - current = me->upper; - if (current) { - current->lowered = 0; - } else { + current_thread = me->upper; + if (!current_thread) { sig_monitor_clean_timeouts(); started--; } @@ -631,19 +649,13 @@ int jobs_call( struct sd_event *jobs_get_sd_event() { struct events *events; - struct thread *me; int rc; pthread_mutex_lock(&mutex); /* search events on stack */ - me = current; - while (me && !me->events) - me = me->upper; - if (me) - /* return the stacked events */ - events = me->events; - else { + events = current_events; + if (!events) { /* search an available events */ events = events_get(); if (!events) { @@ -655,8 +667,7 @@ struct sd_event *jobs_get_sd_event() events = malloc(sizeof *events); if (events && (rc = sd_event_new(&events->event)) >= 0) { if (nevents < started || start_one_thread() >= 0) { - events->used = 0; - events->runs = 0; + events->state = Available; events->next = first_events; first_events = events; } else { @@ -679,13 +690,10 @@ struct sd_event *jobs_get_sd_event() } } if (events) { - me = current; - if (me) { - events->used = 1; - me->events = events; - } else { + events->state = Modifiable; + if (!current_thread) WARNING("event returned for unknown thread!"); - } + current_events = events; } } pthread_mutex_unlock(&mutex); @@ -715,7 +723,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star pthread_mutex_lock(&mutex); /* check whether already running */ - if (current || allowed) { + if (current_thread || allowed) { ERROR("thread already started"); errno = EINVAL; goto error; @@ -822,7 +830,7 @@ void jobs_terminate() head = job->next; /* search if job is stacked for current */ - t = current; + t = current_thread; while (t && t->job != job) t = t->upper; if (t) { @@ -525,7 +525,7 @@ static void run_startup_calls() list = config->calls; if (list) { sreq = calloc(1, sizeof *sreq); - sreq->session = afb_session_create("startup", 3600); + sreq->session = afb_session_create(3600); sreq->current = list; startup_call_current(sreq); } diff --git a/src/verbose.c b/src/verbose.c index f330c6f5..e96627ed 100644 --- a/src/verbose.c +++ b/src/verbose.c @@ -85,6 +85,7 @@ void verbose_set_name(const char *name, int authority) #include <errno.h> #include <string.h> #include <sys/uio.h> +#include <pthread.h> static const char *appname; @@ -105,6 +106,8 @@ static int tty; static const char chars[] = { '\n', '?', ':', ' ', '[', ',', ']' }; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + static void _vverbose_(int loglevel, const char *file, int line, const char *function, const char *fmt, va_list args) { char buffer[4000]; @@ -169,7 +172,9 @@ static void _vverbose_(int loglevel, const char *file, int line, const char *fun iov[n].iov_base = (void*)&chars[0]; iov[n++].iov_len = 1; + pthread_mutex_lock(&mutex); writev(STDERR_FILENO, iov, n); + pthread_mutex_unlock(&mutex); errno = saverr; } diff --git a/test/AFB.js b/test/AFB.js index 59e68abc..ed2ffc2d 100644 --- a/test/AFB.js +++ b/test/AFB.js @@ -21,7 +21,7 @@ if (typeof base != "object") var initial = { base: base.base || "api", - token: base.token || initialtoken || "hello", + token: base.token || initialtoken || "HELLO", host: base.host || window.location.host, url: base.url || undefined }; @@ -105,8 +105,7 @@ var AFB_websocket; function onclose(event) { for (var id in this.pendings) { - var ferr = this.pendings[id].onerror; - ferr && ferr(null, this); + try { this.pendings[id][1](); } catch (x) {/*TODO?*/} } this.pendings = {}; this.onclose && this.onclose(); @@ -131,8 +130,7 @@ var AFB_websocket; if (id in pendings) { var p = pendings[id]; delete pendings[id]; - var f = p[offset]; - f(ans); + try { p[offset](ans); } catch (x) {/*TODO?*/} } } @@ -166,12 +164,18 @@ var AFB_websocket; this.onabort = function(){}; } - function call(method, request) { + function call(method, request, callid) { return new Promise((function(resolve, reject){ var id, arr; - do { - id = String(this.counter = 4095 & (this.counter + 1)); - } while (id in this.pendings); + if (callid) { + id = String(callid); + if (id in this.pendings) + throw new Error("pending callid("+id+") exists"); + } else { + do { + id = String(this.counter = 4095 & (this.counter + 1)); + } while (id in this.pendings); + } this.pendings[id] = [ resolve, reject ]; arr = [CALL, id, method, request ]; if (AFB_context.token) arr.push(AFB_context.token); diff --git a/test/monitoring/AFB.js b/test/monitoring/AFB.js index 59e68abc..ed2ffc2d 100644 --- a/test/monitoring/AFB.js +++ b/test/monitoring/AFB.js @@ -21,7 +21,7 @@ if (typeof base != "object") var initial = { base: base.base || "api", - token: base.token || initialtoken || "hello", + token: base.token || initialtoken || "HELLO", host: base.host || window.location.host, url: base.url || undefined }; @@ -105,8 +105,7 @@ var AFB_websocket; function onclose(event) { for (var id in this.pendings) { - var ferr = this.pendings[id].onerror; - ferr && ferr(null, this); + try { this.pendings[id][1](); } catch (x) {/*TODO?*/} } this.pendings = {}; this.onclose && this.onclose(); @@ -131,8 +130,7 @@ var AFB_websocket; if (id in pendings) { var p = pendings[id]; delete pendings[id]; - var f = p[offset]; - f(ans); + try { p[offset](ans); } catch (x) {/*TODO?*/} } } @@ -166,12 +164,18 @@ var AFB_websocket; this.onabort = function(){}; } - function call(method, request) { + function call(method, request, callid) { return new Promise((function(resolve, reject){ var id, arr; - do { - id = String(this.counter = 4095 & (this.counter + 1)); - } while (id in this.pendings); + if (callid) { + id = String(callid); + if (id in this.pendings) + throw new Error("pending callid("+id+") exists"); + } else { + do { + id = String(this.counter = 4095 & (this.counter + 1)); + } while (id in this.pendings); + } this.pendings[id] = [ resolve, reject ]; arr = [CALL, id, method, request ]; if (AFB_context.token) arr.push(AFB_context.token); diff --git a/test/monitoring/monitor-pastel.css b/test/monitoring/monitor-pastel.css index d5cf3f6f..839f574f 100644 --- a/test/monitoring/monitor-pastel.css +++ b/test/monitoring/monitor-pastel.css @@ -266,7 +266,7 @@ body { /*******************************************************************/ /* json format */ -.json.string { color: lightskyblue; } +.json.string { color: teal; } .json.number { color: darkorange; } .json.boolean { color: deepskyblue; } .json.null { color: magenta; } diff --git a/test/monitoring/monitor.html b/test/monitoring/monitor.html index 2c07c1ba..5a418879 100644 --- a/test/monitoring/monitor.html +++ b/test/monitoring/monitor.html @@ -57,7 +57,7 @@ <div id="params" class="clearfix"> <div>host: <input type="text" id="param-host" size="50" value="localhost"></input></div> <div>port: <input type="text" id="param-port" size="10" value="1234"></input></div> - <div>token: <input type="text" id="param-token" size="33" value="hello"></input></div> + <div>token: <input type="text" id="param-token" size="33" value="HELLO"></input></div> </div> <div class="-flex-fill -box-out"> <div id="trace-events" class="-box-in"> diff --git a/test/monitoring/monitor.js b/test/monitoring/monitor.js index 5b9dc0ea..3c64ab33 100644 --- a/test/monitoring/monitor.js +++ b/test/monitoring/monitor.js @@ -131,7 +131,7 @@ function init() { at("param-host").value = document.location.hostname; at("param-port").value = document.location.port; var args = new URLSearchParams(document.location.search.substring(1)); - at("param-token").value = args.get("x-afb-token") || args.get("token") || "hello"; + at("param-token").value = args.get("x-afb-token") || args.get("token") || "HELLO"; document.onbeforeunload = on_disconnect; |