diff options
-rw-r--r-- | conf.d/controller/lua.d/aft.lua | 88 | ||||
-rw-r--r-- | src/aft.c | 85 |
2 files changed, 89 insertions, 84 deletions
diff --git a/conf.d/controller/lua.d/aft.lua b/conf.d/controller/lua.d/aft.lua index 971f8be..da51a18 100644 --- a/conf.d/controller/lua.d/aft.lua +++ b/conf.d/controller/lua.d/aft.lua @@ -135,55 +135,25 @@ function _AFT.triggerEvtCallback(eventName) end end -function _AFT.bindingEventHandler(eventObj, uid) - local eventName = nil - local eventListeners = nil - local data = nil - - if uid then - eventName = uid - data = eventObj - elseif eventObj.event.name then - eventName = eventObj.event.name - eventListeners = eventObj.data.result - -- Remove from event to hold the bare event data and be able to assert it - eventObj.data.result = nil - data = eventObj.data.data - end - - if type(_AFT.monitored_events[eventName]) == 'table' then - local stopSync = true - if eventListeners then - _AFT.monitored_events[eventName].eventListeners = eventListeners +function _AFT.bindingEventHandler(eventObj) + local eventName = eventObj.event.name + if eventObj.data.result then + _AFT.monitored_events[eventName].eventListeners = eventObj.data.result + end + + _AFT.incrementCount(_AFT.monitored_events[eventName]) + _AFT.registerData(_AFT.monitored_events[eventName], + eventObj.data.data) + + for name,value in pairs(_AFT.monitored_events) do + if (_AFT.monitored_events[name].expected and + _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected + ) + then + return true end - - _AFT.incrementCount(_AFT.monitored_events[eventName]) - _AFT.registerData(_AFT.monitored_events[eventName], data) - - for name,value in pairs(_AFT.monitored_events) do - if (_AFT.monitored_events[name].expected and - _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected) - then - stopSync = false - end - end - - if stopSync == true and _AFT.waiting == true then - AFB:servsync(_AFT.context, _AFT.apiname, "sync", { stop = 1 }) - _AFT.waiting = false - end - end -end - -function _evt_catcher_(source, action, eventObj) - local uid = AFB:getuid(source) - if uid == "monitor/trace" then - if eventObj.type == "event" then - _AFT.bindingEventHandler(eventObj) - end - --else - -- _AFT.bindingEventHandler(eventObj, uid) end + return false end function _AFT.lockWait(eventName, timeout) @@ -192,12 +162,18 @@ function _AFT.lockWait(eventName, timeout) return 0 end - _AFT.waiting = true local err,responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout}) - if err then + if err or (not responseJ and not responseJ.response.event.name) then return 0 end + + _AFT.bindingEventHandler(responseJ.response) + + if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then + return 0 + end + return 1 end @@ -212,10 +188,20 @@ function _AFT.lockWaitGroup(eventGroup, timeout) _AFT.monitored_events[event].expected = expectedCount + _AFT.monitored_events[event].receivedCount end - _AFT.waiting = true + local waiting = true local err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout }) + while waiting do + if err or (not responseJ and not responseJ.response.event.name) then + return 0 + end + + waiting = _AFT.bindingEventHandler(responseJ.response) - if err then + if waiting == true then + err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", {continue = true}) + end + end + if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then return 0 end @@ -17,8 +17,6 @@ */ #define _GNU_SOURCE -#include <stdio.h> -#include <time.h> #include <pthread.h> #include <string.h> #include <systemd/sd-event.h> @@ -35,6 +33,18 @@ static pthread_mutex_t memo_lock; static afb_req_t memo_sync = NULL; static struct sd_event_source *timersrc = NULL; +static void onTraceEvent(void *closure, const char *event, json_object *data, afb_api_t api) { + /* If LUA evt Handler return 0 then stop the waiting sync request else + * do nothing and continue to wait for every requested event to arrive. + */ + pthread_mutex_lock(&memo_lock); + if(memo_sync) { + afb_req_reply(memo_sync, json_object_get(data), NULL, event);afb_req_unref(memo_sync); + memo_sync = NULL; + } + pthread_mutex_unlock(&memo_lock); +} + // Config Section definition static CtlSectionT ctrlSections[] = { {.key = "resources", .loadCB = PluginConfig}, @@ -91,13 +101,23 @@ static void ctrlapi_exit(afb_req_t request) { static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud) { - if (memo_sync) - afb_req_reply(memo_sync, NULL, "timeout", NULL); + afb_req_t req; + + pthread_mutex_lock(&memo_lock); + req = memo_sync; memo_sync = NULL; + sd_event_source_unref(timersrc); timersrc = NULL; + pthread_mutex_unlock(&memo_lock); + + if(req) { + afb_req_reply(req, NULL, "timeout", NULL); + afb_req_unref(req); + } return 0; } + /** * @brief A verb to call synchronously that will end when a timeout expires or * when a call with a 'stop' order given in the arguments. @@ -106,36 +126,39 @@ static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud) */ static void ctrlapi_sync(afb_req_t request) { struct json_object *obj, *val; - uint64_t to, usec; + uint64_t timeout, usec; - AFB_REQ_NOTICE(request, "Syncing..."); + AFB_REQ_DEBUG(request, "Syncing..."); + obj = afb_req_json(request); pthread_mutex_lock(&memo_lock); - obj = afb_req_json(request); - if (json_object_object_get_ex(obj, "start", &val)) { - to = json_object_get_int(val); - if (memo_sync) - afb_req_reply(request, NULL, "Bad-State", "There is an already ongoing waiting request."); - else { - sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec); - usec = to + usec; - sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL); - memo_sync = afb_req_addref(request); + if(json_object_object_get_ex(obj, "start", &val) && + (timeout = json_object_get_int(val)) && + ! memo_sync) { + sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec); + usec = timeout + usec; + sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL); + memo_sync = afb_req_addref(request); + } else if(json_object_object_get_ex(obj, "continue", &val) && ! memo_sync) { + memo_sync = afb_req_addref(request); + } else if(json_object_object_get_ex(obj, "stop", &val) && timersrc) { + if(memo_sync) { + afb_req_reply(request, NULL, NULL, "Unfinished start request ended"); + afb_req_unref(memo_sync); + memo_sync = NULL; } - } else if (json_object_object_get_ex(obj, "stop", &val)) { - if (!memo_sync) - afb_req_reply(request, NULL, "Bad-State", "There isn't any ongoing waiting request."); - else { - afb_req_reply(memo_sync, json_object_get(val), NULL, NULL); + sd_event_source_set_enabled(timersrc, SD_EVENT_OFF); + sd_event_source_unref(timersrc); + afb_req_reply(request, NULL, NULL, "stopped"); + timersrc = NULL; + } else { + if(memo_sync) { + afb_req_reply(request, NULL, "Bad State", "Unfinished start request ended"); afb_req_unref(memo_sync); - sd_event_source_unref(timersrc); memo_sync = NULL; - timersrc = NULL; - afb_req_reply(request, NULL, NULL, NULL); } - } else - afb_req_reply(request, NULL, "Invalid", "No 'start' nor 'stop' order provided."); - + afb_req_reply(request, NULL, "Bad state", NULL); + } pthread_mutex_unlock(&memo_lock); } @@ -188,7 +211,7 @@ static int CtrlLoadOneApi(void *cbdata, afb_api_t apiHandle) { err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections); // declare an event event manager for this API; - afb_api_on_event(apiHandle, CtrlDispatchApiEvent); + afb_api_event_handler_add(apiHandle, "monitor/trace", onTraceEvent, NULL); // init API function (does not receive user closure ??? afb_api_on_init(apiHandle, CtrlInitOneApi); @@ -207,7 +230,7 @@ static CtlConfigT *CtrlLoadConfigFile(afb_api_t apiHandle, const char *configPat static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) { int err = 0; - json_object *resourcesJ = NULL, *eventsJ = NULL; + json_object *resourcesJ = NULL; if(!ctrlConfig) { AFB_API_ERROR(apiHandle, @@ -229,16 +252,12 @@ static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) { "uid", "AFT", "info", "LUA Binder test framework", "libs", "aft.lua" ); - err += wrap_json_pack(&eventsJ, "{s[{ss, ss}]}", "events", - "uid", "monitor/trace", - "action", "lua://AFT#_evt_catcher_" ); if(err) { AFB_API_ERROR(apiHandle, "Error at Controller configuration editing."); return err; } wrap_json_object_add(ctrlConfig->configJ, resourcesJ); - wrap_json_object_add(ctrlConfig->configJ, eventsJ); if(! afb_api_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 0, CtrlLoadOneApi, ctrlConfig)) return ERROR; |