aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2019-01-18 13:37:12 +0100
committerRomain Forlot <romain.forlot@iot.bzh>2019-01-23 15:06:14 +0100
commit569a70213a52c5a0ba22c3a110c4b511d4574c83 (patch)
treed1f0f17c0513cd4af976a4f696245bd266d00f41
parent7582a430b719f2f22dc6312f5e464e7e7b834ba3 (diff)
Handle event in core binding
Prefers to handle the events reception by the binding, in C, rather than using the LUA interpreter to avoid race condition on the LUA context. Because we are waiting events in LUA context using binder call sync, when the event is received then 2 threads operate simultaneously on LUA context, the waiting event thread and the receiving event thread. Bug-AGL: SPEC-2135 Change-Id: Ied0a78a61263b8fd41305969c636a491c6bb0295 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--conf.d/cmake/config.cmake3
-rw-r--r--conf.d/controller/lua.d/aft.lua88
-rw-r--r--src/aft.c85
-rw-r--r--src/aft.h8
4 files changed, 97 insertions, 87 deletions
diff --git a/conf.d/cmake/config.cmake b/conf.d/cmake/config.cmake
index 81dcebd..c6c957f 100644
--- a/conf.d/cmake/config.cmake
+++ b/conf.d/cmake/config.cmake
@@ -71,7 +71,7 @@ set (PKG_REQUIRED_LIST
json-c
libsystemd>=222
afb-daemon>=4.0
- ctl-utilities
+ appcontroller
afb-helpers
)
@@ -126,7 +126,6 @@ set(INSTALL_PREFIX /opt/AGL CACHE PATH "INSTALL PREFIX PATH")
# -O2
# CACHE STRING "Compilation flags for RELEASE build type.")
-set(CONTROL_SUPPORT_LUA 1)
add_definitions(-DLUA_GLOB_PATTERN="/var/?.lua\\\;")
add_definitions(-DCONTROL_PLUGIN_PATH="./var:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/lib/plugins:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/var:${INSTALL_PREFIX}/${PROJECT_NAME}/lib/plugins:${INSTALL_PREFIX}/${PROJECT_NAME}/var:${CMAKE_BINARY_DIR}/package/lib/plugins:${CMAKE_BINARY_DIR}/package/var")
add_definitions(-DCONTROL_CONFIG_PATH="./etc:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/etc:${INSTALL_PREFIX}/${PROJECT_NAME}/etc:${CMAKE_BINARY_DIR}/package/etc")
diff --git a/conf.d/controller/lua.d/aft.lua b/conf.d/controller/lua.d/aft.lua
index 971f8be..da51a18 100644
--- a/conf.d/controller/lua.d/aft.lua
+++ b/conf.d/controller/lua.d/aft.lua
@@ -135,55 +135,25 @@ function _AFT.triggerEvtCallback(eventName)
end
end
-function _AFT.bindingEventHandler(eventObj, uid)
- local eventName = nil
- local eventListeners = nil
- local data = nil
-
- if uid then
- eventName = uid
- data = eventObj
- elseif eventObj.event.name then
- eventName = eventObj.event.name
- eventListeners = eventObj.data.result
- -- Remove from event to hold the bare event data and be able to assert it
- eventObj.data.result = nil
- data = eventObj.data.data
- end
-
- if type(_AFT.monitored_events[eventName]) == 'table' then
- local stopSync = true
- if eventListeners then
- _AFT.monitored_events[eventName].eventListeners = eventListeners
+function _AFT.bindingEventHandler(eventObj)
+ local eventName = eventObj.event.name
+ if eventObj.data.result then
+ _AFT.monitored_events[eventName].eventListeners = eventObj.data.result
+ end
+
+ _AFT.incrementCount(_AFT.monitored_events[eventName])
+ _AFT.registerData(_AFT.monitored_events[eventName],
+ eventObj.data.data)
+
+ for name,value in pairs(_AFT.monitored_events) do
+ if (_AFT.monitored_events[name].expected and
+ _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected
+ )
+ then
+ return true
end
-
- _AFT.incrementCount(_AFT.monitored_events[eventName])
- _AFT.registerData(_AFT.monitored_events[eventName], data)
-
- for name,value in pairs(_AFT.monitored_events) do
- if (_AFT.monitored_events[name].expected and
- _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected)
- then
- stopSync = false
- end
- end
-
- if stopSync == true and _AFT.waiting == true then
- AFB:servsync(_AFT.context, _AFT.apiname, "sync", { stop = 1 })
- _AFT.waiting = false
- end
- end
-end
-
-function _evt_catcher_(source, action, eventObj)
- local uid = AFB:getuid(source)
- if uid == "monitor/trace" then
- if eventObj.type == "event" then
- _AFT.bindingEventHandler(eventObj)
- end
- --else
- -- _AFT.bindingEventHandler(eventObj, uid)
end
+ return false
end
function _AFT.lockWait(eventName, timeout)
@@ -192,12 +162,18 @@ function _AFT.lockWait(eventName, timeout)
return 0
end
- _AFT.waiting = true
local err,responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout})
- if err then
+ if err or (not responseJ and not responseJ.response.event.name) then
return 0
end
+
+ _AFT.bindingEventHandler(responseJ.response)
+
+ if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then
+ return 0
+ end
+
return 1
end
@@ -212,10 +188,20 @@ function _AFT.lockWaitGroup(eventGroup, timeout)
_AFT.monitored_events[event].expected = expectedCount + _AFT.monitored_events[event].receivedCount
end
- _AFT.waiting = true
+ local waiting = true
local err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout })
+ while waiting do
+ if err or (not responseJ and not responseJ.response.event.name) then
+ return 0
+ end
+
+ waiting = _AFT.bindingEventHandler(responseJ.response)
- if err then
+ if waiting == true then
+ err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", {continue = true})
+ end
+ end
+ if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then
return 0
end
diff --git a/src/aft.c b/src/aft.c
index 19510af..7d8af8a 100644
--- a/src/aft.c
+++ b/src/aft.c
@@ -17,8 +17,6 @@
*/
#define _GNU_SOURCE
-#include <stdio.h>
-#include <time.h>
#include <pthread.h>
#include <string.h>
#include <systemd/sd-event.h>
@@ -35,6 +33,18 @@ static pthread_mutex_t memo_lock;
static afb_req_t memo_sync = NULL;
static struct sd_event_source *timersrc = NULL;
+static void onTraceEvent(void *closure, const char *event, json_object *data, afb_api_t api) {
+ /* If LUA evt Handler return 0 then stop the waiting sync request else
+ * do nothing and continue to wait for every requested event to arrive.
+ */
+ pthread_mutex_lock(&memo_lock);
+ if(memo_sync) {
+ afb_req_reply(memo_sync, json_object_get(data), NULL, event);afb_req_unref(memo_sync);
+ memo_sync = NULL;
+ }
+ pthread_mutex_unlock(&memo_lock);
+}
+
// Config Section definition
static CtlSectionT ctrlSections[] = {
{.key = "resources", .loadCB = PluginConfig},
@@ -91,13 +101,23 @@ static void ctrlapi_exit(afb_req_t request) {
static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud)
{
- if (memo_sync)
- afb_req_reply(memo_sync, NULL, "timeout", NULL);
+ afb_req_t req;
+
+ pthread_mutex_lock(&memo_lock);
+ req = memo_sync;
memo_sync = NULL;
+ sd_event_source_unref(timersrc);
timersrc = NULL;
+ pthread_mutex_unlock(&memo_lock);
+
+ if(req) {
+ afb_req_reply(req, NULL, "timeout", NULL);
+ afb_req_unref(req);
+ }
return 0;
}
+
/**
* @brief A verb to call synchronously that will end when a timeout expires or
* when a call with a 'stop' order given in the arguments.
@@ -106,36 +126,39 @@ static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud)
*/
static void ctrlapi_sync(afb_req_t request) {
struct json_object *obj, *val;
- uint64_t to, usec;
+ uint64_t timeout, usec;
- AFB_REQ_NOTICE(request, "Syncing...");
+ AFB_REQ_DEBUG(request, "Syncing...");
+ obj = afb_req_json(request);
pthread_mutex_lock(&memo_lock);
- obj = afb_req_json(request);
- if (json_object_object_get_ex(obj, "start", &val)) {
- to = json_object_get_int(val);
- if (memo_sync)
- afb_req_reply(request, NULL, "Bad-State", "There is an already ongoing waiting request.");
- else {
- sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec);
- usec = to + usec;
- sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL);
- memo_sync = afb_req_addref(request);
+ if(json_object_object_get_ex(obj, "start", &val) &&
+ (timeout = json_object_get_int(val)) &&
+ ! memo_sync) {
+ sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec);
+ usec = timeout + usec;
+ sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL);
+ memo_sync = afb_req_addref(request);
+ } else if(json_object_object_get_ex(obj, "continue", &val) && ! memo_sync) {
+ memo_sync = afb_req_addref(request);
+ } else if(json_object_object_get_ex(obj, "stop", &val) && timersrc) {
+ if(memo_sync) {
+ afb_req_reply(request, NULL, NULL, "Unfinished start request ended");
+ afb_req_unref(memo_sync);
+ memo_sync = NULL;
}
- } else if (json_object_object_get_ex(obj, "stop", &val)) {
- if (!memo_sync)
- afb_req_reply(request, NULL, "Bad-State", "There isn't any ongoing waiting request.");
- else {
- afb_req_reply(memo_sync, json_object_get(val), NULL, NULL);
+ sd_event_source_set_enabled(timersrc, SD_EVENT_OFF);
+ sd_event_source_unref(timersrc);
+ afb_req_reply(request, NULL, NULL, "stopped");
+ timersrc = NULL;
+ } else {
+ if(memo_sync) {
+ afb_req_reply(request, NULL, "Bad State", "Unfinished start request ended");
afb_req_unref(memo_sync);
- sd_event_source_unref(timersrc);
memo_sync = NULL;
- timersrc = NULL;
- afb_req_reply(request, NULL, NULL, NULL);
}
- } else
- afb_req_reply(request, NULL, "Invalid", "No 'start' nor 'stop' order provided.");
-
+ afb_req_reply(request, NULL, "Bad state", NULL);
+ }
pthread_mutex_unlock(&memo_lock);
}
@@ -188,7 +211,7 @@ static int CtrlLoadOneApi(void *cbdata, afb_api_t apiHandle) {
err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections);
// declare an event event manager for this API;
- afb_api_on_event(apiHandle, CtrlDispatchApiEvent);
+ afb_api_event_handler_add(apiHandle, "monitor/trace", onTraceEvent, NULL);
// init API function (does not receive user closure ???
afb_api_on_init(apiHandle, CtrlInitOneApi);
@@ -207,7 +230,7 @@ static CtlConfigT *CtrlLoadConfigFile(afb_api_t apiHandle, const char *configPat
static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) {
int err = 0;
- json_object *resourcesJ = NULL, *eventsJ = NULL;
+ json_object *resourcesJ = NULL;
if(!ctrlConfig) {
AFB_API_ERROR(apiHandle,
@@ -229,16 +252,12 @@ static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) {
"uid", "AFT",
"info", "LUA Binder test framework",
"libs", "aft.lua" );
- err += wrap_json_pack(&eventsJ, "{s[{ss, ss}]}", "events",
- "uid", "monitor/trace",
- "action", "lua://AFT#_evt_catcher_" );
if(err) {
AFB_API_ERROR(apiHandle, "Error at Controller configuration editing.");
return err;
}
wrap_json_object_add(ctrlConfig->configJ, resourcesJ);
- wrap_json_object_add(ctrlConfig->configJ, eventsJ);
if(! afb_api_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 0, CtrlLoadOneApi, ctrlConfig))
return ERROR;
diff --git a/src/aft.h b/src/aft.h
index 7ac00a0..ec8fd7e 100644
--- a/src/aft.h
+++ b/src/aft.h
@@ -21,12 +21,18 @@
#define _CTL_BINDING_INCLUDE_
#include <stdio.h>
-#include <ctl-config.h>
#include <filescan-utils.h>
#include <wrap-json.h>
+#include <afb/afb-binding.h>
#ifndef ERROR
#define ERROR -1
#endif
+#ifndef CONTROL_SUPPORT_LUA
+ #define CONTROL_SUPPORT_LUA 1
+#endif
+
+#include <ctl-config.h>
+
#endif /* _CTL_BINDING_INCLUDE_ */