summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--conf.d/controller/lua.d/aft.lua88
-rw-r--r--src/aft.c85
2 files changed, 89 insertions, 84 deletions
diff --git a/conf.d/controller/lua.d/aft.lua b/conf.d/controller/lua.d/aft.lua
index 971f8be..da51a18 100644
--- a/conf.d/controller/lua.d/aft.lua
+++ b/conf.d/controller/lua.d/aft.lua
@@ -135,55 +135,25 @@ function _AFT.triggerEvtCallback(eventName)
end
end
-function _AFT.bindingEventHandler(eventObj, uid)
- local eventName = nil
- local eventListeners = nil
- local data = nil
-
- if uid then
- eventName = uid
- data = eventObj
- elseif eventObj.event.name then
- eventName = eventObj.event.name
- eventListeners = eventObj.data.result
- -- Remove from event to hold the bare event data and be able to assert it
- eventObj.data.result = nil
- data = eventObj.data.data
- end
-
- if type(_AFT.monitored_events[eventName]) == 'table' then
- local stopSync = true
- if eventListeners then
- _AFT.monitored_events[eventName].eventListeners = eventListeners
+function _AFT.bindingEventHandler(eventObj)
+ local eventName = eventObj.event.name
+ if eventObj.data.result then
+ _AFT.monitored_events[eventName].eventListeners = eventObj.data.result
+ end
+
+ _AFT.incrementCount(_AFT.monitored_events[eventName])
+ _AFT.registerData(_AFT.monitored_events[eventName],
+ eventObj.data.data)
+
+ for name,value in pairs(_AFT.monitored_events) do
+ if (_AFT.monitored_events[name].expected and
+ _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected
+ )
+ then
+ return true
end
-
- _AFT.incrementCount(_AFT.monitored_events[eventName])
- _AFT.registerData(_AFT.monitored_events[eventName], data)
-
- for name,value in pairs(_AFT.monitored_events) do
- if (_AFT.monitored_events[name].expected and
- _AFT.monitored_events[name].receivedCount < _AFT.monitored_events[name].expected)
- then
- stopSync = false
- end
- end
-
- if stopSync == true and _AFT.waiting == true then
- AFB:servsync(_AFT.context, _AFT.apiname, "sync", { stop = 1 })
- _AFT.waiting = false
- end
- end
-end
-
-function _evt_catcher_(source, action, eventObj)
- local uid = AFB:getuid(source)
- if uid == "monitor/trace" then
- if eventObj.type == "event" then
- _AFT.bindingEventHandler(eventObj)
- end
- --else
- -- _AFT.bindingEventHandler(eventObj, uid)
end
+ return false
end
function _AFT.lockWait(eventName, timeout)
@@ -192,12 +162,18 @@ function _AFT.lockWait(eventName, timeout)
return 0
end
- _AFT.waiting = true
local err,responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout})
- if err then
+ if err or (not responseJ and not responseJ.response.event.name) then
return 0
end
+
+ _AFT.bindingEventHandler(responseJ.response)
+
+ if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then
+ return 0
+ end
+
return 1
end
@@ -212,10 +188,20 @@ function _AFT.lockWaitGroup(eventGroup, timeout)
_AFT.monitored_events[event].expected = expectedCount + _AFT.monitored_events[event].receivedCount
end
- _AFT.waiting = true
+ local waiting = true
local err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", { start = timeout })
+ while waiting do
+ if err or (not responseJ and not responseJ.response.event.name) then
+ return 0
+ end
+
+ waiting = _AFT.bindingEventHandler(responseJ.response)
- if err then
+ if waiting == true then
+ err, responseJ = AFB:servsync(_AFT.context, _AFT.apiname, "sync", {continue = true})
+ end
+ end
+ if AFB:servsync(_AFT.context, _AFT.apiname, "sync", {stop = true}) then
return 0
end
diff --git a/src/aft.c b/src/aft.c
index 19510af..7d8af8a 100644
--- a/src/aft.c
+++ b/src/aft.c
@@ -17,8 +17,6 @@
*/
#define _GNU_SOURCE
-#include <stdio.h>
-#include <time.h>
#include <pthread.h>
#include <string.h>
#include <systemd/sd-event.h>
@@ -35,6 +33,18 @@ static pthread_mutex_t memo_lock;
static afb_req_t memo_sync = NULL;
static struct sd_event_source *timersrc = NULL;
+static void onTraceEvent(void *closure, const char *event, json_object *data, afb_api_t api) {
+ /* If LUA evt Handler return 0 then stop the waiting sync request else
+ * do nothing and continue to wait for every requested event to arrive.
+ */
+ pthread_mutex_lock(&memo_lock);
+ if(memo_sync) {
+ afb_req_reply(memo_sync, json_object_get(data), NULL, event);afb_req_unref(memo_sync);
+ memo_sync = NULL;
+ }
+ pthread_mutex_unlock(&memo_lock);
+}
+
// Config Section definition
static CtlSectionT ctrlSections[] = {
{.key = "resources", .loadCB = PluginConfig},
@@ -91,13 +101,23 @@ static void ctrlapi_exit(afb_req_t request) {
static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud)
{
- if (memo_sync)
- afb_req_reply(memo_sync, NULL, "timeout", NULL);
+ afb_req_t req;
+
+ pthread_mutex_lock(&memo_lock);
+ req = memo_sync;
memo_sync = NULL;
+ sd_event_source_unref(timersrc);
timersrc = NULL;
+ pthread_mutex_unlock(&memo_lock);
+
+ if(req) {
+ afb_req_reply(req, NULL, "timeout", NULL);
+ afb_req_unref(req);
+ }
return 0;
}
+
/**
* @brief A verb to call synchronously that will end when a timeout expires or
* when a call with a 'stop' order given in the arguments.
@@ -106,36 +126,39 @@ static int timeoutCB(struct sd_event_source *s, uint64_t us, void *ud)
*/
static void ctrlapi_sync(afb_req_t request) {
struct json_object *obj, *val;
- uint64_t to, usec;
+ uint64_t timeout, usec;
- AFB_REQ_NOTICE(request, "Syncing...");
+ AFB_REQ_DEBUG(request, "Syncing...");
+ obj = afb_req_json(request);
pthread_mutex_lock(&memo_lock);
- obj = afb_req_json(request);
- if (json_object_object_get_ex(obj, "start", &val)) {
- to = json_object_get_int(val);
- if (memo_sync)
- afb_req_reply(request, NULL, "Bad-State", "There is an already ongoing waiting request.");
- else {
- sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec);
- usec = to + usec;
- sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL);
- memo_sync = afb_req_addref(request);
+ if(json_object_object_get_ex(obj, "start", &val) &&
+ (timeout = json_object_get_int(val)) &&
+ ! memo_sync) {
+ sd_event_now(afb_api_get_event_loop(afb_req_get_api(request)), CLOCK_MONOTONIC, &usec);
+ usec = timeout + usec;
+ sd_event_add_time(afb_api_get_event_loop(afb_req_get_api(request)), &timersrc, CLOCK_MONOTONIC, usec, 0, timeoutCB, NULL);
+ memo_sync = afb_req_addref(request);
+ } else if(json_object_object_get_ex(obj, "continue", &val) && ! memo_sync) {
+ memo_sync = afb_req_addref(request);
+ } else if(json_object_object_get_ex(obj, "stop", &val) && timersrc) {
+ if(memo_sync) {
+ afb_req_reply(request, NULL, NULL, "Unfinished start request ended");
+ afb_req_unref(memo_sync);
+ memo_sync = NULL;
}
- } else if (json_object_object_get_ex(obj, "stop", &val)) {
- if (!memo_sync)
- afb_req_reply(request, NULL, "Bad-State", "There isn't any ongoing waiting request.");
- else {
- afb_req_reply(memo_sync, json_object_get(val), NULL, NULL);
+ sd_event_source_set_enabled(timersrc, SD_EVENT_OFF);
+ sd_event_source_unref(timersrc);
+ afb_req_reply(request, NULL, NULL, "stopped");
+ timersrc = NULL;
+ } else {
+ if(memo_sync) {
+ afb_req_reply(request, NULL, "Bad State", "Unfinished start request ended");
afb_req_unref(memo_sync);
- sd_event_source_unref(timersrc);
memo_sync = NULL;
- timersrc = NULL;
- afb_req_reply(request, NULL, NULL, NULL);
}
- } else
- afb_req_reply(request, NULL, "Invalid", "No 'start' nor 'stop' order provided.");
-
+ afb_req_reply(request, NULL, "Bad state", NULL);
+ }
pthread_mutex_unlock(&memo_lock);
}
@@ -188,7 +211,7 @@ static int CtrlLoadOneApi(void *cbdata, afb_api_t apiHandle) {
err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections);
// declare an event event manager for this API;
- afb_api_on_event(apiHandle, CtrlDispatchApiEvent);
+ afb_api_event_handler_add(apiHandle, "monitor/trace", onTraceEvent, NULL);
// init API function (does not receive user closure ???
afb_api_on_init(apiHandle, CtrlInitOneApi);
@@ -207,7 +230,7 @@ static CtlConfigT *CtrlLoadConfigFile(afb_api_t apiHandle, const char *configPat
static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) {
int err = 0;
- json_object *resourcesJ = NULL, *eventsJ = NULL;
+ json_object *resourcesJ = NULL;
if(!ctrlConfig) {
AFB_API_ERROR(apiHandle,
@@ -229,16 +252,12 @@ static int CtrlCreateApi(afb_api_t apiHandle, CtlConfigT *ctrlConfig) {
"uid", "AFT",
"info", "LUA Binder test framework",
"libs", "aft.lua" );
- err += wrap_json_pack(&eventsJ, "{s[{ss, ss}]}", "events",
- "uid", "monitor/trace",
- "action", "lua://AFT#_evt_catcher_" );
if(err) {
AFB_API_ERROR(apiHandle, "Error at Controller configuration editing.");
return err;
}
wrap_json_object_add(ctrlConfig->configJ, resourcesJ);
- wrap_json_object_add(ctrlConfig->configJ, eventsJ);
if(! afb_api_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 0, CtrlLoadOneApi, ctrlConfig))
return ERROR;