summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-07-11 18:20:23 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2019-07-12 15:49:32 +0200
commit248ec5dafada74c829dbe197888166c5807e22e2 (patch)
tree47e9d3c805faab92c04eba107283fbbeaef615fe /src
parent909893eb96838441b64272a649764367dfd69557 (diff)
afb-evt: send events in separate job
Sending events in the context of the calling process had the consequence that the ordering of the messages had to be removed (see SPEC-2215 & SPEC-2219). This was not good by nature and lead to issues SPEC-2542 and SPEC-2599. Sending events in the context of the calling process also implies to delay the calling process. For this reasons, sending events is now done in an other jobs. For that reason, the count of allowed pending jobs is increased to 100 (was 50). Bug-AGL: SPEC-2215 Bug-AGL: SPEC-2219 Bug-AGL: SPEC-2542 Bug-AGL: SPEC-2599 Change-Id: I5b56d952cc187b65ad6eb9344ad74e5e8d3b7540 Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
Diffstat (limited to 'src')
-rw-r--r--src/afb-evt.c267
-rw-r--r--src/main-afb-daemon.c2
2 files changed, 227 insertions, 42 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c
index 960d4137..74ed71a0 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -30,6 +30,7 @@
#include "afb-evt.h"
#include "afb-hook.h"
#include "verbose.h"
+#include "jobs.h"
struct afb_evt_watch;
@@ -85,6 +86,9 @@ struct afb_evtid {
/* id of the event */
int id;
+ /* has client? */
+ int has_client;
+
/* fullname of the event */
char fullname[];
};
@@ -110,6 +114,30 @@ struct afb_evt_watch {
unsigned activity;
};
+/*
+ * structure for job of broadcasting string events
+ */
+struct job_string
+{
+ /** object atached to the event */
+ struct json_object *object;
+
+ /** name of the event to broadcast */
+ char event[];
+};
+
+/*
+ * structure for job of broadcasting or pushing events
+ */
+struct job_evtid
+{
+ /** the event to broadcast */
+ struct afb_evtid *evtid;
+
+ /** object atached to the event */
+ struct json_object *object;
+};
+
/* the interface for events */
static struct afb_event_x2_itf afb_evt_event_x2_itf = {
.broadcast = (void*)afb_evt_evtid_broadcast,
@@ -130,6 +158,10 @@ static struct afb_event_x2_itf afb_evt_hooked_event_x2_itf = {
};
#endif
+/* job groups for events push/broadcast */
+#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
+#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
+
/* head of the list of listeners */
static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
@@ -141,57 +173,143 @@ static int event_id_counter = 0;
static int event_id_wrapped = 0;
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * Returns the count of listener having receive the event.
+ * Create structure for job of broadcasting string 'event' with 'object'
+ * Returns the created structure or NULL if out of memory
*/
-static int broadcast(const char *event, struct json_object *obj, int id)
+static struct job_string *make_job_string(const char *event, struct json_object *object)
{
- int result;
- struct afb_evt_listener *listener;
+ size_t sz = 1 + strlen(event);
+ struct job_string *js = malloc(sz + sizeof *js);
+ if (js) {
+ js->object = object;
+ memcpy(js->event, event, sz);
+ }
+ return js;
+}
+
+/*
+ * Destroy structure 'js' for job of broadcasting string events
+ */
+static void destroy_job_string(struct job_string *js)
+{
+ json_object_put(js->object);
+ free(js);
+}
+
+/*
+ * Create structure for job of broadcasting or pushing 'evtid' with 'object'
+ * Returns the created structure or NULL if out of memory
+ */
+static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je = malloc(sizeof *je);
+ if (je) {
+ je->evtid = afb_evt_evtid_addref(evtid);
+ je->object = object;
+ }
+ return je;
+}
+
+/*
+ * Destroy structure for job of broadcasting or pushing evtid
+ */
+static void destroy_job_evtid(struct job_evtid *je)
+{
+ afb_evt_evtid_unref(je->evtid);
+ json_object_put(je->object);
+ free(je);
+}
- result = 0;
+/*
+ * Broadcasts the 'event' of 'id' with its 'object'
+ */
+static void broadcast(const char *event, struct json_object *object, int id)
+{
+ struct afb_evt_listener *listener;
pthread_rwlock_rdlock(&listeners_rwlock);
listener = listeners;
while(listener) {
- if (listener->itf->broadcast != NULL) {
- listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
- result++;
- }
+ if (listener->itf->broadcast != NULL)
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
listener = listener->next;
}
pthread_rwlock_unlock(&listeners_rwlock);
- json_object_put(obj);
- return result;
}
-#if WITH_AFB_HOOK
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * calls hooks if hookflags isn't 0
- * Returns the count of listener having receive the event.
+ * Jobs callback for broadcasting string asynchronously
*/
-static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
+static void broadcast_job_string(int signum, void *closure)
{
- int result;
+ struct job_string *js = closure;
- json_object_get(obj);
+ if (signum == 0)
+ broadcast(js->event, js->object, 0);
+ destroy_job_string(js);
+}
+
+/*
+ * Jobs callback for broadcasting evtid asynchronously
+ */
+static void broadcast_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ broadcast(je->evtid->fullname, je->object, je->evtid->id);
+ destroy_job_evtid(je);
+}
- if (hookflags & afb_hook_flag_evt_broadcast_before)
- afb_hook_evt_broadcast_before(event, id, obj);
+/*
+ * Broadcasts the string 'event' with its 'object'
+ */
+static int broadcast_string(const char *event, struct json_object *object)
+{
+ struct job_string *js;
+ int rc;
- result = broadcast(event, obj, id);
+ js = make_job_string(event, object);
+ if (js == NULL) {
+ ERROR("Cant't create broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
- if (hookflags & afb_hook_flag_evt_broadcast_after)
- afb_hook_evt_broadcast_after(event, id, obj, result);
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
+ if (rc) {
+ ERROR("cant't queue broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ destroy_job_string(js);
+ }
+ return rc;
+}
- json_object_put(obj);
+/*
+ * Broadcasts the 'evtid' with its 'object'
+ */
+static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
- return result;
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
+ if (rc) {
+ ERROR("cant't queue broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+ return rc;
}
-#endif
/*
* Broadcasts the event 'evtid' with its 'object'
@@ -200,7 +318,7 @@ static int hooked_broadcast(const char *event, struct json_object *obj, int id,
*/
int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return broadcast(evtid->fullname, object, evtid->id);
+ return broadcast_evtid(evtid, object);
}
#if WITH_AFB_HOOK
@@ -211,7 +329,21 @@ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
*/
int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
+ int result;
+
+ json_object_get(object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
+
+ result = broadcast_evtid(evtid, object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
+
+ json_object_put(object);
+
+ return result;
}
#endif
@@ -223,38 +355,89 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
int afb_evt_broadcast(const char *event, struct json_object *object)
{
#if WITH_AFB_HOOK
- return hooked_broadcast(event, object, 0, -1);
+ int result;
+
+ json_object_get(object);
+
+ afb_hook_evt_broadcast_before(event, 0, object);
+ result = broadcast_string(event, object);
+ afb_hook_evt_broadcast_after(event, 0, object, result);
+
+ json_object_put(object);
+
+ return result;
#else
- return broadcast(event, object, 0);
+ return broadcast_string(event, object);
#endif
}
/*
* Pushes the event 'evtid' with 'obj' to its listeners
- * 'obj' is released (like json_object_put)
* Returns the count of listener that received the event.
*/
-int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
+static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
- int result;
+ int has_client;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
- result = 0;
+ has_client = 0;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
- result++;
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
+ has_client = 1;
}
watch = watch->next_by_evtid;
}
+ evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
- json_object_put(obj);
- return result;
+}
+
+/*
+ * Jobs callback for pushing evtid asynchronously
+ */
+static void push_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ push_evtid(je->evtid, je->object);
+ destroy_job_evtid(je);
+}
+
+/*
+ * Pushes the event 'evtid' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns 1 if at least one listener exists or 0 if no listener exists or
+ * -1 in case of error and the event can't be delivered
+ */
+int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
+
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
+ if (rc == 0)
+ rc = evtid->has_client;
+ else {
+ ERROR("cant't queue push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+
+ return rc;
}
#if WITH_AFB_HOOK
@@ -354,6 +537,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = event_id_counter;
+ evtid->has_client = 0;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
#if WITH_AFB_HOOK
@@ -633,6 +817,7 @@ found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
watch->activity++;
+ evtid->has_client = 1;
pthread_rwlock_unlock(&listener->rwlock);
return 0;
diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c
index cc07e86f..7b92905f 100644
--- a/src/main-afb-daemon.c
+++ b/src/main-afb-daemon.c
@@ -939,7 +939,7 @@ int main(int argc, char *argv[])
afb_debug("main-start");
/* enter job processing */
- jobs_start(3, 0, 50, start, NULL);
+ jobs_start(3, 0, 100, start, NULL);
WARNING("hoops returned from jobs_enter! [report bug]");
return 1;
}