summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/reference-v3/func-api.md2
-rw-r--r--docs/reference-v3/func-event.md7
-rw-r--r--include/afb/afb-daemon-v1.h2
-rw-r--r--include/afb/afb-daemon-v2.h2
-rw-r--r--include/afb/afb-event-x2.h7
-rw-r--r--src/afb-evt.c263
-rw-r--r--src/main-afb-daemon.c2
7 files changed, 239 insertions, 46 deletions
diff --git a/docs/reference-v3/func-api.md b/docs/reference-v3/func-api.md
index aa28932a..0c0921d5 100644
--- a/docs/reference-v3/func-api.md
+++ b/docs/reference-v3/func-api.md
@@ -425,7 +425,7 @@ int afb_api_queue_job(
* @param name the event name suffix
* @param object the object that comes with the event
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
int afb_api_broadcast_event(
afb_api_t api,
diff --git a/docs/reference-v3/func-event.md b/docs/reference-v3/func-event.md
index d96095a8..9d8d9050 100644
--- a/docs/reference-v3/func-event.md
+++ b/docs/reference-v3/func-event.md
@@ -78,7 +78,7 @@ afb_event_t *afb_event_addref(
* @param event the event to broadcast
* @param object the companion object to associate to the broadcasted event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
int afb_event_broadcast(
afb_event_t event,
@@ -99,7 +99,10 @@ int afb_event_broadcast(
* @param event the event to push
* @param object the companion object to associate to the pushed event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @Return
+ * * 1 if at least one client listen for the event
+ * * 0 if no more client listen for the event
+ * * -1 in case of error (the event can't be delivered)
*/
int afb_event_push(
afb_event_t event,
diff --git a/include/afb/afb-daemon-v1.h b/include/afb/afb-daemon-v1.h
index c268bb83..0a821b72 100644
--- a/include/afb/afb-daemon-v1.h
+++ b/include/afb/afb-daemon-v1.h
@@ -68,7 +68,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v1(struct afb_daemon_x1 d
*
* Calling this function is only forbidden during preinit.
*
- * Returns the count of clients that received the event.
+ * Returns 0 in case of success or -1 in case of error
*/
static inline int afb_daemon_broadcast_event_v1(struct afb_daemon_x1 daemon, const char *name, struct json_object *object)
{
diff --git a/include/afb/afb-daemon-v2.h b/include/afb/afb-daemon-v2.h
index 65e4afbc..220ce0b7 100644
--- a/include/afb/afb-daemon-v2.h
+++ b/include/afb/afb-daemon-v2.h
@@ -64,7 +64,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v2()
*
* Calling this function is only forbidden during preinit.
*
- * Returns the count of clients that received the event.
+ * Returns 0 in case of success or -1 in case of error
*/
static inline int afb_daemon_broadcast_event_v2(const char *name, struct json_object *object)
{
diff --git a/include/afb/afb-event-x2.h b/include/afb/afb-event-x2.h
index 9732cad1..beb31f9d 100644
--- a/include/afb/afb-event-x2.h
+++ b/include/afb/afb-event-x2.h
@@ -45,7 +45,7 @@ static inline int afb_event_x2_is_valid(struct afb_event_x2 *event)
* @param event the event to broadcast
* @param object the companion object to associate to the broadcasted event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @return 0 in case of success or -1 in case of error
*/
static inline int afb_event_x2_broadcast(
struct afb_event_x2 *event,
@@ -65,7 +65,10 @@ static inline int afb_event_x2_broadcast(
* @param event the event to push
* @param object the companion object to associate to the pushed event (can be NULL)
*
- * @return the count of clients that received the event.
+ * @Return
+ * * 1 if at least one client listen for the event
+ * * 0 if no more client listen for the event
+ * * -1 in case of error (the event can't be delivered)
*/
static inline int afb_event_x2_push(
struct afb_event_x2 *event,
diff --git a/src/afb-evt.c b/src/afb-evt.c
index d361c954..dcb8743a 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -30,6 +30,7 @@
#include "afb-evt.h"
#include "afb-hook.h"
#include "verbose.h"
+#include "jobs.h"
struct afb_evt_watch;
@@ -83,6 +84,9 @@ struct afb_evtid {
/* id of the event */
int id;
+ /* has client? */
+ int has_client;
+
/* fullname of the event */
char fullname[];
};
@@ -108,6 +112,30 @@ struct afb_evt_watch {
unsigned activity;
};
+/*
+ * structure for job of broadcasting string events
+ */
+struct job_string
+{
+ /** object atached to the event */
+ struct json_object *object;
+
+ /** name of the event to broadcast */
+ char event[];
+};
+
+/*
+ * structure for job of broadcasting or pushing events
+ */
+struct job_evtid
+{
+ /** the event to broadcast */
+ struct afb_evtid *evtid;
+
+ /** object atached to the event */
+ struct json_object *object;
+};
+
/* the interface for events */
static struct afb_event_x2_itf afb_evt_event_x2_itf = {
.broadcast = (void*)afb_evt_evtid_broadcast,
@@ -126,6 +154,10 @@ static struct afb_event_x2_itf afb_evt_hooked_eventid_itf = {
.addref = (void*)afb_evt_evtid_hooked_addref
};
+/* job groups for events push/broadcast */
+#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
+#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
+
/* head of the list of listeners */
static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
@@ -137,54 +169,142 @@ static int event_id_counter = 0;
static int event_id_wrapped = 0;
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * Returns the count of listener having receive the event.
+ * Create structure for job of broadcasting string 'event' with 'object'
+ * Returns the created structure or NULL if out of memory
*/
-static int broadcast(const char *event, struct json_object *obj, int id)
+static struct job_string *make_job_string(const char *event, struct json_object *object)
{
- int result;
- struct afb_evt_listener *listener;
+ size_t sz = 1 + strlen(event);
+ struct job_string *js = malloc(sz + sizeof *js);
+ if (js) {
+ js->object = object;
+ memcpy(js->event, event, sz);
+ }
+ return js;
+}
- result = 0;
+/*
+ * Destroy structure 'js' for job of broadcasting string events
+ */
+static void destroy_job_string(struct job_string *js)
+{
+ json_object_put(js->object);
+ free(js);
+}
+
+/*
+ * Create structure for job of broadcasting or pushing 'evtid' with 'object'
+ * Returns the created structure or NULL if out of memory
+ */
+static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je = malloc(sizeof *je);
+ if (je) {
+ je->evtid = afb_evt_evtid_addref(evtid);
+ je->object = object;
+ }
+ return je;
+}
+
+/*
+ * Destroy structure for job of broadcasting or pushing evtid
+ */
+static void destroy_job_evtid(struct job_evtid *je)
+{
+ afb_evt_evtid_unref(je->evtid);
+ json_object_put(je->object);
+ free(je);
+}
+
+/*
+ * Broadcasts the 'event' of 'id' with its 'object'
+ */
+static void broadcast(const char *event, struct json_object *object, int id)
+{
+ struct afb_evt_listener *listener;
pthread_rwlock_rdlock(&listeners_rwlock);
listener = listeners;
while(listener) {
- if (listener->itf->broadcast != NULL) {
- listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
- result++;
- }
+ if (listener->itf->broadcast != NULL)
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
listener = listener->next;
}
pthread_rwlock_unlock(&listeners_rwlock);
- json_object_put(obj);
- return result;
}
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * calls hooks if hookflags isn't 0
- * Returns the count of listener having receive the event.
+ * Jobs callback for broadcasting string asynchronously
*/
-static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
+static void broadcast_job_string(int signum, void *closure)
{
- int result;
+ struct job_string *js = closure;
- json_object_get(obj);
+ if (signum == 0)
+ broadcast(js->event, js->object, 0);
+ destroy_job_string(js);
+}
+
+/*
+ * Jobs callback for broadcasting evtid asynchronously
+ */
+static void broadcast_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ broadcast(je->evtid->fullname, je->object, je->evtid->id);
+ destroy_job_evtid(je);
+}
- if (hookflags & afb_hook_flag_evt_broadcast_before)
- afb_hook_evt_broadcast_before(event, id, obj);
+/*
+ * Broadcasts the string 'event' with its 'object'
+ */
+static int broadcast_string(const char *event, struct json_object *object)
+{
+ struct job_string *js;
+ int rc;
- result = broadcast(event, obj, id);
+ js = make_job_string(event, object);
+ if (js == NULL) {
+ ERROR("Cant't create broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
- if (hookflags & afb_hook_flag_evt_broadcast_after)
- afb_hook_evt_broadcast_after(event, id, obj, result);
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
+ if (rc) {
+ ERROR("cant't queue broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ destroy_job_string(js);
+ }
+ return rc;
+}
- json_object_put(obj);
+/*
+ * Broadcasts the 'evtid' with its 'object'
+ */
+static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
- return result;
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
+ if (rc) {
+ ERROR("cant't queue broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+ return rc;
}
/*
@@ -194,7 +314,7 @@ static int hooked_broadcast(const char *event, struct json_object *obj, int id,
*/
int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return broadcast(evtid->fullname, object, evtid->id);
+ return broadcast_evtid(evtid, object);
}
/*
@@ -204,7 +324,21 @@ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
*/
int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
+ int result;
+
+ json_object_get(object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
+
+ result = broadcast_evtid(evtid, object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
+
+ json_object_put(object);
+
+ return result;
}
/*
@@ -214,35 +348,86 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
*/
int afb_evt_broadcast(const char *event, struct json_object *object)
{
- return hooked_broadcast(event, object, 0, -1);
+ int result;
+
+ json_object_get(object);
+
+ afb_hook_evt_broadcast_before(event, 0, object);
+ result = broadcast_string(event, object);
+ afb_hook_evt_broadcast_after(event, 0, object, result);
+
+ json_object_put(object);
+
+ return result;
}
/*
* Pushes the event 'evtid' with 'obj' to its listeners
- * 'obj' is released (like json_object_put)
* Returns the count of listener that received the event.
*/
-int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
+static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
- int result;
+ int has_client;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
- result = 0;
+ has_client = 0;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
- result++;
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
+ has_client = 1;
}
watch = watch->next_by_evtid;
}
+ evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
- json_object_put(obj);
- return result;
+}
+
+/*
+ * Jobs callback for pushing evtid asynchronously
+ */
+static void push_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ push_evtid(je->evtid, je->object);
+ destroy_job_evtid(je);
+}
+
+/*
+ * Pushes the event 'evtid' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns 1 if at least one listener exists or 0 if no listener exists or
+ * -1 in case of error and the event can't be delivered
+ */
+int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
+
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
+ if (rc == 0)
+ rc = evtid->has_client;
+ else {
+ ERROR("cant't queue push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+
+ return rc;
}
/*
@@ -340,6 +525,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = event_id_counter;
+ evtid->has_client = 0;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
@@ -609,6 +795,7 @@ found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
watch->activity++;
+ evtid->has_client = 1;
pthread_rwlock_unlock(&listener->rwlock);
return 0;
diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c
index bb2f0a56..b25cf86e 100644
--- a/src/main-afb-daemon.c
+++ b/src/main-afb-daemon.c
@@ -903,7 +903,7 @@ int main(int argc, char *argv[])
afb_debug("main-start");
/* enter job processing */
- jobs_start(3, 0, 50, start, NULL);
+ jobs_start(3, 0, 100, start, NULL);
WARNING("hoops returned from jobs_enter! [report bug]");
return 1;
}