aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-07-11 18:20:23 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2019-07-12 16:07:37 +0200
commit919445036879bb7681cd19582a899ea43609c8a3 (patch)
treef81ef125d50a27111d23373800b3bfb8700c50fd
parent14b401595576394f6d17aeddd6c172a445e30757 (diff)
afb-evt: send events in separate job
Sending events in the context of the calling process had the consequence that the ordering of the messages had to be removed (see SPEC-2215 & SPEC-2219). This was not good by nature and lead to issues SPEC-2542 and SPEC-2599. Sending events in the context of the calling process also implies to delay the calling process. For this reasons, sending events is now done in an other jobs. For that reason, the count of allowed pending jobs is increased to 100 (was 50). Bug-AGL: SPEC-2215 Bug-AGL: SPEC-2219 Bug-AGL: SPEC-2542 Bug-AGL: SPEC-2599 Change-Id: I5b56d952cc187b65ad6eb9344ad74e5e8d3b7540 Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
-rw-r--r--docs/reference-v3/func-api.md2
-rw-r--r--docs/reference-v3/func-event.md7
-rw-r--r--include/afb/afb-daemon-v1.h2
-rw-r--r--include/afb/afb-daemon-v2.h2
-rw-r--r--include/afb/afb-event-x2.h7
-rw-r--r--src/afb-evt.c263
-rw-r--r--src/main-afb-daemon.c2
7 files changed, 239 insertions, 46 deletions
diff --git a/docs/reference-v3/func-api.md b/docs/reference-v3/func-api.md
index aa28932a..0c0921d5 100644
--- a/docs/reference-v3/func-api.md
+++ b/docs/reference-v3/func-api.md
@@ -425,7 +425,7 @@ int afb_api_queue_job(
* @param name the event name suffix
* @param object the object that comes with the event
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
int afb_api_broadcast_event(
afb_api_t api,
diff --git a/docs/reference-v3/func-event.md b/docs/reference-v3/func-event.md
index d96095a8..9d8d9050 100644
--- a/docs/reference-v3/func-event.md
+++ b/docs/reference-v3/func-event.md
@@ -78,7 +78,7 @@ afb_event_t *afb_event_addref(
* @param event the event to broadcast
* @param object the companion object to associate to the broadcasted event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
int afb_event_broadcast(
afb_event_t event,
@@ -99,7 +99,10 @@ int afb_event_broadcast(
* @param event the event to push
* @param object the companion object to associate to the pushed event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @Return
+ * * 1 if at least one client listen for the event
+ * * 0 if no more client listen for the event
+ * * -1 in case of error (the event can't be delivered)
*/
int afb_event_push(
afb_event_t event,
diff --git a/include/afb/afb-daemon-v1.h b/include/afb/afb-daemon-v1.h
index c268bb83..0a821b72 100644
--- a/include/afb/afb-daemon-v1.h
+++ b/include/afb/afb-daemon-v1.h
@@ -68,7 +68,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v1(struct afb_daemon_x1 d
*
* Calling this function is only forbidden during preinit.
*
- * Returns the count of clients that received the event.
+ * Returns 0 in case of success or -1 in case of error
*/
static inline int afb_daemon_broadcast_event_v1(struct afb_daemon_x1 daemon, const char *name, struct json_object *object)
{
diff --git a/include/afb/afb-daemon-v2.h b/include/afb/afb-daemon-v2.h
index 65e4afbc..220ce0b7 100644
--- a/include/afb/afb-daemon-v2.h
+++ b/include/afb/afb-daemon-v2.h
@@ -64,7 +64,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v2()
*
* Calling this function is only forbidden during preinit.
*
- * Returns the count of clients that received the event.
+ * Returns 0 in case of success or -1 in case of error
*/
static inline int afb_daemon_broadcast_event_v2(const char *name, struct json_object *object)
{
diff --git a/include/afb/afb-event-x2.h b/include/afb/afb-event-x2.h
index 9732cad1..beb31f9d 100644
--- a/include/afb/afb-event-x2.h
+++ b/include/afb/afb-event-x2.h
@@ -45,7 +45,7 @@ static inline int afb_event_x2_is_valid(struct afb_event_x2 *event)
* @param event the event to broadcast
* @param object the companion object to associate to the broadcasted event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
static inline int afb_event_x2_broadcast(
struct afb_event_x2 *event,
@@ -65,7 +65,10 @@ static inline int afb_event_x2_broadcast(
* @param event the event to push
* @param object the companion object to associate to the pushed event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @Return
+ * * 1 if at least one client listen for the event
+ * * 0 if no more client listen for the event
+ * * -1 in case of error (the event can't be delivered)
*/
static inline int afb_event_x2_push(
struct afb_event_x2 *event,
diff --git a/src/afb-evt.c b/src/afb-evt.c
index d361c954..dcb8743a 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -30,6 +30,7 @@
#include "afb-evt.h"
#include "afb-hook.h"
#include "verbose.h"
+#include "jobs.h"
struct afb_evt_watch;
@@ -83,6 +84,9 @@ struct afb_evtid {
/* id of the event */
int id;
+ /* has client? */
+ int has_client;
+
/* fullname of the event */
char fullname[];
};
@@ -108,6 +112,30 @@ struct afb_evt_watch {
unsigned activity;
};
+/*
+ * structure for job of broadcasting string events
+ */
+struct job_string
+{
+ /** object atached to the event */
+ struct json_object *object;
+
+ /** name of the event to broadcast */
+ char event[];
+};
+
+/*
+ * structure for job of broadcasting or pushing events
+ */
+struct job_evtid
+{
+ /** the event to broadcast */
+ struct afb_evtid *evtid;
+
+ /** object atached to the event */
+ struct json_object *object;
+};
+
/* the interface for events */
static struct afb_event_x2_itf afb_evt_event_x2_itf = {
.broadcast = (void*)afb_evt_evtid_broadcast,
@@ -126,6 +154,10 @@ static struct afb_event_x2_itf afb_evt_hooked_eventid_itf = {
.addref = (void*)afb_evt_evtid_hooked_addref
};
+/* job groups for events push/broadcast */
+#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
+#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
+
/* head of the list of listeners */
static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
@@ -137,54 +169,142 @@ static int event_id_counter = 0;
static int event_id_wrapped = 0;
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * Returns the count of listener having receive the event.
+ * Create structure for job of broadcasting string 'event' with 'object'
+ * Returns the created structure or NULL if out of memory
*/
-static int broadcast(const char *event, struct json_object *obj, int id)
+static struct job_string *make_job_string(const char *event, struct json_object *object)
{
- int result;
- struct afb_evt_listener *listener;
+ size_t sz = 1 + strlen(event);
+ struct job_string *js = malloc(sz + sizeof *js);
+ if (js) {
+ js->object = object;
+ memcpy(js->event, event, sz);
+ }
+ return js;
+}
- result = 0;
+/*
+ * Destroy structure 'js' for job of broadcasting string events
+ */
+static void destroy_job_string(struct job_string *js)
+{
+ json_object_put(js->object);
+ free(js);
+}
+
+/*
+ * Create structure for job of broadcasting or pushing 'evtid' with 'object'
+ * Returns the created structure or NULL if out of memory
+ */
+static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je = malloc(sizeof *je);
+ if (je) {
+ je->evtid = afb_evt_evtid_addref(evtid);
+ je->object = object;
+ }
+ return je;
+}
+
+/*
+ * Destroy structure for job of broadcasting or pushing evtid
+ */
+static void destroy_job_evtid(struct job_evtid *je)
+{
+ afb_evt_evtid_unref(je->evtid);
+ json_object_put(je->object);
+ free(je);
+}
+
+/*
+ * Broadcasts the 'event' of 'id' with its 'object'
+ */
+static void broadcast(const char *event, struct json_object *object, int id)
+{
+ struct afb_evt_listener *listener;
pthread_rwlock_rdlock(&listeners_rwlock);
listener = listeners;
while(listener) {
- if (listener->itf->broadcast != NULL) {
- listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
- result++;
- }
+ if (listener->itf->broadcast != NULL)
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
listener = listener->next;
}
pthread_rwlock_unlock(&listeners_rwlock);
- json_object_put(obj);
- return result;
}
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * calls hooks if hookflags isn't 0
- * Returns the count of listener having receive the event.
+ * Jobs callback for broadcasting string asynchronously
*/
-static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
+static void broadcast_job_string(int signum, void *closure)
{
- int result;
+ struct job_string *js = closure;
- json_object_get(obj);
+ if (signum == 0)
+ broadcast(js->event, js->object, 0);
+ destroy_job_string(js);
+}
+
+/*
+ * Jobs callback for broadcasting evtid asynchronously
+ */
+static void broadcast_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ broadcast(je->evtid->fullname, je->object, je->evtid->id);
+ destroy_job_evtid(je);
+}
- if (hookflags & afb_hook_flag_evt_broadcast_before)
- afb_hook_evt_broadcast_before(event, id, obj);
+/*
+ * Broadcasts the string 'event' with its 'object'
+ */
+static int broadcast_string(const char *event, struct json_object *object)
+{
+ struct job_string *js;
+ int rc;
- result = broadcast(event, obj, id);
+ js = make_job_string(event, object);
+ if (js == NULL) {
+ ERROR("Cant't create broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
- if (hookflags & afb_hook_flag_evt_broadcast_after)
- afb_hook_evt_broadcast_after(event, id, obj, result);
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
+ if (rc) {
+ ERROR("cant't queue broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ destroy_job_string(js);
+ }
+ return rc;
+}
- json_object_put(obj);
+/*
+ * Broadcasts the 'evtid' with its 'object'
+ */
+static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
- return result;
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
+ if (rc) {
+ ERROR("cant't queue broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+ return rc;
}
/*
@@ -194,7 +314,7 @@ static int hooked_broadcast(const char *event, struct json_object *obj, int id,
*/
int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return broadcast(evtid->fullname, object, evtid->id);
+ return broadcast_evtid(evtid, object);
}
/*
@@ -204,7 +324,21 @@ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
*/
int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
+ int result;
+
+ json_object_get(object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
+
+ result = broadcast_evtid(evtid, object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
+
+ json_object_put(object);
+
+ return result;
}
/*
@@ -214,35 +348,86 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
*/
int afb_evt_broadcast(const char *event, struct json_object *object)
{
- return hooked_broadcast(event, object, 0, -1);
+ int result;
+
+ json_object_get(object);
+
+ afb_hook_evt_broadcast_before(event, 0, object);
+ result = broadcast_string(event, object);
+ afb_hook_evt_broadcast_after(event, 0, object, result);
+
+ json_object_put(object);
+
+ return result;
}
/*
* Pushes the event 'evtid' with 'obj' to its listeners
- * 'obj' is released (like json_object_put)
* Returns the count of listener that received the event.
*/
-int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
+static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
- int result;
+ int has_client;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
- result = 0;
+ has_client = 0;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
- result++;
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
+ has_client = 1;
}
watch = watch->next_by_evtid;
}
+ evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
- json_object_put(obj);
- return result;
+}
+
+/*
+ * Jobs callback for pushing evtid asynchronously
+ */
+static void push_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ push_evtid(je->evtid, je->object);
+ destroy_job_evtid(je);
+}
+
+/*
+ * Pushes the event 'evtid' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns 1 if at least one listener exists or 0 if no listener exists or
+ * -1 in case of error and the event can't be delivered
+ */
+int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
+
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
+ if (rc == 0)
+ rc = evtid->has_client;
+ else {
+ ERROR("cant't queue push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+
+ return rc;
}
/*
@@ -340,6 +525,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = event_id_counter;
+ evtid->has_client = 0;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
@@ -609,6 +795,7 @@ found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
watch->activity++;
+ evtid->has_client = 1;
pthread_rwlock_unlock(&listener->rwlock);
return 0;
diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c
index bb2f0a56..b25cf86e 100644
--- a/src/main-afb-daemon.c
+++ b/src/main-afb-daemon.c
@@ -903,7 +903,7 @@ int main(int argc, char *argv[])
afb_debug("main-start");
/* enter job processing */
- jobs_start(3, 0, 50, start, NULL);
+ jobs_start(3, 0, 100, start, NULL);
WARNING("hoops returned from jobs_enter! [report bug]");
return 1;
}