aboutsummaryrefslogtreecommitdiffstats
path: root/src/afb-evt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/afb-evt.c')
-rw-r--r--src/afb-evt.c267
1 files changed, 226 insertions, 41 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c
index 960d4137..74ed71a0 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -30,6 +30,7 @@
#include "afb-evt.h"
#include "afb-hook.h"
#include "verbose.h"
+#include "jobs.h"
struct afb_evt_watch;
@@ -85,6 +86,9 @@ struct afb_evtid {
/* id of the event */
int id;
+ /* has client? */
+ int has_client;
+
/* fullname of the event */
char fullname[];
};
@@ -110,6 +114,30 @@ struct afb_evt_watch {
unsigned activity;
};
+/*
+ * structure for job of broadcasting string events
+ */
+struct job_string
+{
+ /** object atached to the event */
+ struct json_object *object;
+
+ /** name of the event to broadcast */
+ char event[];
+};
+
+/*
+ * structure for job of broadcasting or pushing events
+ */
+struct job_evtid
+{
+ /** the event to broadcast */
+ struct afb_evtid *evtid;
+
+ /** object atached to the event */
+ struct json_object *object;
+};
+
/* the interface for events */
static struct afb_event_x2_itf afb_evt_event_x2_itf = {
.broadcast = (void*)afb_evt_evtid_broadcast,
@@ -130,6 +158,10 @@ static struct afb_event_x2_itf afb_evt_hooked_event_x2_itf = {
};
#endif
+/* job groups for events push/broadcast */
+#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
+#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
+
/* head of the list of listeners */
static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
@@ -141,57 +173,143 @@ static int event_id_counter = 0;
static int event_id_wrapped = 0;
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * Returns the count of listener having receive the event.
+ * Create structure for job of broadcasting string 'event' with 'object'
+ * Returns the created structure or NULL if out of memory
*/
-static int broadcast(const char *event, struct json_object *obj, int id)
+static struct job_string *make_job_string(const char *event, struct json_object *object)
{
- int result;
- struct afb_evt_listener *listener;
+ size_t sz = 1 + strlen(event);
+ struct job_string *js = malloc(sz + sizeof *js);
+ if (js) {
+ js->object = object;
+ memcpy(js->event, event, sz);
+ }
+ return js;
+}
+
+/*
+ * Destroy structure 'js' for job of broadcasting string events
+ */
+static void destroy_job_string(struct job_string *js)
+{
+ json_object_put(js->object);
+ free(js);
+}
+
+/*
+ * Create structure for job of broadcasting or pushing 'evtid' with 'object'
+ * Returns the created structure or NULL if out of memory
+ */
+static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je = malloc(sizeof *je);
+ if (je) {
+ je->evtid = afb_evt_evtid_addref(evtid);
+ je->object = object;
+ }
+ return je;
+}
+
+/*
+ * Destroy structure for job of broadcasting or pushing evtid
+ */
+static void destroy_job_evtid(struct job_evtid *je)
+{
+ afb_evt_evtid_unref(je->evtid);
+ json_object_put(je->object);
+ free(je);
+}
- result = 0;
+/*
+ * Broadcasts the 'event' of 'id' with its 'object'
+ */
+static void broadcast(const char *event, struct json_object *object, int id)
+{
+ struct afb_evt_listener *listener;
pthread_rwlock_rdlock(&listeners_rwlock);
listener = listeners;
while(listener) {
- if (listener->itf->broadcast != NULL) {
- listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
- result++;
- }
+ if (listener->itf->broadcast != NULL)
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
listener = listener->next;
}
pthread_rwlock_unlock(&listeners_rwlock);
- json_object_put(obj);
- return result;
}
-#if WITH_AFB_HOOK
/*
- * Broadcasts the 'event' of 'id' with its 'obj'
- * 'obj' is released (like json_object_put)
- * calls hooks if hookflags isn't 0
- * Returns the count of listener having receive the event.
+ * Jobs callback for broadcasting string asynchronously
*/
-static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
+static void broadcast_job_string(int signum, void *closure)
{
- int result;
+ struct job_string *js = closure;
- json_object_get(obj);
+ if (signum == 0)
+ broadcast(js->event, js->object, 0);
+ destroy_job_string(js);
+}
+
+/*
+ * Jobs callback for broadcasting evtid asynchronously
+ */
+static void broadcast_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ broadcast(je->evtid->fullname, je->object, je->evtid->id);
+ destroy_job_evtid(je);
+}
- if (hookflags & afb_hook_flag_evt_broadcast_before)
- afb_hook_evt_broadcast_before(event, id, obj);
+/*
+ * Broadcasts the string 'event' with its 'object'
+ */
+static int broadcast_string(const char *event, struct json_object *object)
+{
+ struct job_string *js;
+ int rc;
- result = broadcast(event, obj, id);
+ js = make_job_string(event, object);
+ if (js == NULL) {
+ ERROR("Cant't create broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
- if (hookflags & afb_hook_flag_evt_broadcast_after)
- afb_hook_evt_broadcast_after(event, id, obj, result);
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
+ if (rc) {
+ ERROR("cant't queue broadcast string job item for %s(%s)",
+ event, json_object_to_json_string(object));
+ destroy_job_string(js);
+ }
+ return rc;
+}
- json_object_put(obj);
+/*
+ * Broadcasts the 'evtid' with its 'object'
+ */
+static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
- return result;
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
+ if (rc) {
+ ERROR("cant't queue broadcast evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+ return rc;
}
-#endif
/*
* Broadcasts the event 'evtid' with its 'object'
@@ -200,7 +318,7 @@ static int hooked_broadcast(const char *event, struct json_object *obj, int id,
*/
int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return broadcast(evtid->fullname, object, evtid->id);
+ return broadcast_evtid(evtid, object);
}
#if WITH_AFB_HOOK
@@ -211,7 +329,21 @@ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
*/
int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
+ int result;
+
+ json_object_get(object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
+
+ result = broadcast_evtid(evtid, object);
+
+ if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
+
+ json_object_put(object);
+
+ return result;
}
#endif
@@ -223,38 +355,89 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
int afb_evt_broadcast(const char *event, struct json_object *object)
{
#if WITH_AFB_HOOK
- return hooked_broadcast(event, object, 0, -1);
+ int result;
+
+ json_object_get(object);
+
+ afb_hook_evt_broadcast_before(event, 0, object);
+ result = broadcast_string(event, object);
+ afb_hook_evt_broadcast_after(event, 0, object, result);
+
+ json_object_put(object);
+
+ return result;
#else
- return broadcast(event, object, 0);
+ return broadcast_string(event, object);
#endif
}
/*
* Pushes the event 'evtid' with 'obj' to its listeners
- * 'obj' is released (like json_object_put)
* Returns the count of listener that received the event.
*/
-int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
+static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
- int result;
+ int has_client;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
- result = 0;
+ has_client = 0;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
- result++;
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
+ has_client = 1;
}
watch = watch->next_by_evtid;
}
+ evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
- json_object_put(obj);
- return result;
+}
+
+/*
+ * Jobs callback for pushing evtid asynchronously
+ */
+static void push_job_evtid(int signum, void *closure)
+{
+ struct job_evtid *je = closure;
+
+ if (signum == 0)
+ push_evtid(je->evtid, je->object);
+ destroy_job_evtid(je);
+}
+
+/*
+ * Pushes the event 'evtid' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns 1 if at least one listener exists or 0 if no listener exists or
+ * -1 in case of error and the event can't be delivered
+ */
+int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
+{
+ struct job_evtid *je;
+ int rc;
+
+ je = make_job_evtid(evtid, object);
+ if (je == NULL) {
+ ERROR("Cant't create push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ json_object_put(object);
+ return -1;
+ }
+
+ rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
+ if (rc == 0)
+ rc = evtid->has_client;
+ else {
+ ERROR("cant't queue push evtid job item for %s(%s)",
+ evtid->fullname, json_object_to_json_string(object));
+ destroy_job_evtid(je);
+ }
+
+ return rc;
}
#if WITH_AFB_HOOK
@@ -354,6 +537,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = event_id_counter;
+ evtid->has_client = 0;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
#if WITH_AFB_HOOK
@@ -633,6 +817,7 @@ found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
watch->activity++;
+ evtid->has_client = 1;
pthread_rwlock_unlock(&listener->rwlock);
return 0;