diff options
-rw-r--r-- | docs/reference-v3/func-api.md | 2 | ||||
-rw-r--r-- | docs/reference-v3/func-event.md | 7 | ||||
-rw-r--r-- | include/afb/afb-daemon-v1.h | 2 | ||||
-rw-r--r-- | include/afb/afb-daemon-v2.h | 2 | ||||
-rw-r--r-- | include/afb/afb-event-x2.h | 7 | ||||
-rw-r--r-- | src/afb-evt.c | 263 | ||||
-rw-r--r-- | src/main-afb-daemon.c | 2 |
7 files changed, 239 insertions, 46 deletions
diff --git a/docs/reference-v3/func-api.md b/docs/reference-v3/func-api.md index aa28932a..0c0921d5 100644 --- a/docs/reference-v3/func-api.md +++ b/docs/reference-v3/func-api.md @@ -425,7 +425,7 @@ int afb_api_queue_job( * @param name the event name suffix * @param object the object that comes with the event * - * @return the count of clients that received the event. + * @return 0 in case of success or -1 in case of error */ int afb_api_broadcast_event( afb_api_t api, diff --git a/docs/reference-v3/func-event.md b/docs/reference-v3/func-event.md index d96095a8..9d8d9050 100644 --- a/docs/reference-v3/func-event.md +++ b/docs/reference-v3/func-event.md @@ -78,7 +78,7 @@ afb_event_t *afb_event_addref( * @param event the event to broadcast * @param object the companion object to associate to the broadcasted event (can be NULL) * - * @return the count of clients that received the event. + * @return 0 in case of success or -1 in case of error */ int afb_event_broadcast( afb_event_t event, @@ -99,7 +99,10 @@ int afb_event_broadcast( * @param event the event to push * @param object the companion object to associate to the pushed event (can be NULL) * - * @return the count of clients that received the event. + * @Return + * * 1 if at least one client listen for the event + * * 0 if no more client listen for the event + * * -1 in case of error (the event can't be delivered) */ int afb_event_push( afb_event_t event, diff --git a/include/afb/afb-daemon-v1.h b/include/afb/afb-daemon-v1.h index c268bb83..0a821b72 100644 --- a/include/afb/afb-daemon-v1.h +++ b/include/afb/afb-daemon-v1.h @@ -68,7 +68,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v1(struct afb_daemon_x1 d * * Calling this function is only forbidden during preinit. * - * Returns the count of clients that received the event. + * Returns 0 in case of success or -1 in case of error */ static inline int afb_daemon_broadcast_event_v1(struct afb_daemon_x1 daemon, const char *name, struct json_object *object) { diff --git a/include/afb/afb-daemon-v2.h b/include/afb/afb-daemon-v2.h index 65e4afbc..220ce0b7 100644 --- a/include/afb/afb-daemon-v2.h +++ b/include/afb/afb-daemon-v2.h @@ -64,7 +64,7 @@ static inline struct sd_bus *afb_daemon_get_system_bus_v2() * * Calling this function is only forbidden during preinit. * - * Returns the count of clients that received the event. + * Returns 0 in case of success or -1 in case of error */ static inline int afb_daemon_broadcast_event_v2(const char *name, struct json_object *object) { diff --git a/include/afb/afb-event-x2.h b/include/afb/afb-event-x2.h index 9732cad1..beb31f9d 100644 --- a/include/afb/afb-event-x2.h +++ b/include/afb/afb-event-x2.h @@ -45,7 +45,7 @@ static inline int afb_event_x2_is_valid(struct afb_event_x2 *event) * @param event the event to broadcast * @param object the companion object to associate to the broadcasted event (can be NULL) * - * @return the count of clients that received the event. + * @return 0 in case of success or -1 in case of error */ static inline int afb_event_x2_broadcast( struct afb_event_x2 *event, @@ -65,7 +65,10 @@ static inline int afb_event_x2_broadcast( * @param event the event to push * @param object the companion object to associate to the pushed event (can be NULL) * - * @return the count of clients that received the event. + * @Return + * * 1 if at least one client listen for the event + * * 0 if no more client listen for the event + * * -1 in case of error (the event can't be delivered) */ static inline int afb_event_x2_push( struct afb_event_x2 *event, diff --git a/src/afb-evt.c b/src/afb-evt.c index d361c954..dcb8743a 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -30,6 +30,7 @@ #include "afb-evt.h" #include "afb-hook.h" #include "verbose.h" +#include "jobs.h" struct afb_evt_watch; @@ -83,6 +84,9 @@ struct afb_evtid { /* id of the event */ int id; + /* has client? */ + int has_client; + /* fullname of the event */ char fullname[]; }; @@ -108,6 +112,30 @@ struct afb_evt_watch { unsigned activity; }; +/* + * structure for job of broadcasting string events + */ +struct job_string +{ + /** object atached to the event */ + struct json_object *object; + + /** name of the event to broadcast */ + char event[]; +}; + +/* + * structure for job of broadcasting or pushing events + */ +struct job_evtid +{ + /** the event to broadcast */ + struct afb_evtid *evtid; + + /** object atached to the event */ + struct json_object *object; +}; + /* the interface for events */ static struct afb_event_x2_itf afb_evt_event_x2_itf = { .broadcast = (void*)afb_evt_evtid_broadcast, @@ -126,6 +154,10 @@ static struct afb_event_x2_itf afb_evt_hooked_eventid_itf = { .addref = (void*)afb_evt_evtid_hooked_addref }; +/* job groups for events push/broadcast */ +#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf) +#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf) + /* head of the list of listeners */ static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER; static struct afb_evt_listener *listeners = NULL; @@ -137,54 +169,142 @@ static int event_id_counter = 0; static int event_id_wrapped = 0; /* - * Broadcasts the 'event' of 'id' with its 'obj' - * 'obj' is released (like json_object_put) - * Returns the count of listener having receive the event. + * Create structure for job of broadcasting string 'event' with 'object' + * Returns the created structure or NULL if out of memory */ -static int broadcast(const char *event, struct json_object *obj, int id) +static struct job_string *make_job_string(const char *event, struct json_object *object) { - int result; - struct afb_evt_listener *listener; + size_t sz = 1 + strlen(event); + struct job_string *js = malloc(sz + sizeof *js); + if (js) { + js->object = object; + memcpy(js->event, event, sz); + } + return js; +} - result = 0; +/* + * Destroy structure 'js' for job of broadcasting string events + */ +static void destroy_job_string(struct job_string *js) +{ + json_object_put(js->object); + free(js); +} + +/* + * Create structure for job of broadcasting or pushing 'evtid' with 'object' + * Returns the created structure or NULL if out of memory + */ +static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object) +{ + struct job_evtid *je = malloc(sizeof *je); + if (je) { + je->evtid = afb_evt_evtid_addref(evtid); + je->object = object; + } + return je; +} + +/* + * Destroy structure for job of broadcasting or pushing evtid + */ +static void destroy_job_evtid(struct job_evtid *je) +{ + afb_evt_evtid_unref(je->evtid); + json_object_put(je->object); + free(je); +} + +/* + * Broadcasts the 'event' of 'id' with its 'object' + */ +static void broadcast(const char *event, struct json_object *object, int id) +{ + struct afb_evt_listener *listener; pthread_rwlock_rdlock(&listeners_rwlock); listener = listeners; while(listener) { - if (listener->itf->broadcast != NULL) { - listener->itf->broadcast(listener->closure, event, id, json_object_get(obj)); - result++; - } + if (listener->itf->broadcast != NULL) + listener->itf->broadcast(listener->closure, event, id, json_object_get(object)); listener = listener->next; } pthread_rwlock_unlock(&listeners_rwlock); - json_object_put(obj); - return result; } /* - * Broadcasts the 'event' of 'id' with its 'obj' - * 'obj' is released (like json_object_put) - * calls hooks if hookflags isn't 0 - * Returns the count of listener having receive the event. + * Jobs callback for broadcasting string asynchronously */ -static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags) +static void broadcast_job_string(int signum, void *closure) { - int result; + struct job_string *js = closure; - json_object_get(obj); + if (signum == 0) + broadcast(js->event, js->object, 0); + destroy_job_string(js); +} + +/* + * Jobs callback for broadcasting evtid asynchronously + */ +static void broadcast_job_evtid(int signum, void *closure) +{ + struct job_evtid *je = closure; + + if (signum == 0) + broadcast(je->evtid->fullname, je->object, je->evtid->id); + destroy_job_evtid(je); +} - if (hookflags & afb_hook_flag_evt_broadcast_before) - afb_hook_evt_broadcast_before(event, id, obj); +/* + * Broadcasts the string 'event' with its 'object' + */ +static int broadcast_string(const char *event, struct json_object *object) +{ + struct job_string *js; + int rc; - result = broadcast(event, obj, id); + js = make_job_string(event, object); + if (js == NULL) { + ERROR("Cant't create broadcast string job item for %s(%s)", + event, json_object_to_json_string(object)); + json_object_put(object); + return -1; + } - if (hookflags & afb_hook_flag_evt_broadcast_after) - afb_hook_evt_broadcast_after(event, id, obj, result); + rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js); + if (rc) { + ERROR("cant't queue broadcast string job item for %s(%s)", + event, json_object_to_json_string(object)); + destroy_job_string(js); + } + return rc; +} - json_object_put(obj); +/* + * Broadcasts the 'evtid' with its 'object' + */ +static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object) +{ + struct job_evtid *je; + int rc; - return result; + je = make_job_evtid(evtid, object); + if (je == NULL) { + ERROR("Cant't create broadcast evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + json_object_put(object); + return -1; + } + + rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je); + if (rc) { + ERROR("cant't queue broadcast evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + destroy_job_evtid(je); + } + return rc; } /* @@ -194,7 +314,7 @@ static int hooked_broadcast(const char *event, struct json_object *obj, int id, */ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object) { - return broadcast(evtid->fullname, object, evtid->id); + return broadcast_evtid(evtid, object); } /* @@ -204,7 +324,21 @@ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object) */ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object) { - return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags); + int result; + + json_object_get(object); + + if (evtid->hookflags & afb_hook_flag_evt_broadcast_before) + afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object); + + result = broadcast_evtid(evtid, object); + + if (evtid->hookflags & afb_hook_flag_evt_broadcast_after) + afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result); + + json_object_put(object); + + return result; } /* @@ -214,35 +348,86 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object * */ int afb_evt_broadcast(const char *event, struct json_object *object) { - return hooked_broadcast(event, object, 0, -1); + int result; + + json_object_get(object); + + afb_hook_evt_broadcast_before(event, 0, object); + result = broadcast_string(event, object); + afb_hook_evt_broadcast_after(event, 0, object, result); + + json_object_put(object); + + return result; } /* * Pushes the event 'evtid' with 'obj' to its listeners - * 'obj' is released (like json_object_put) * Returns the count of listener that received the event. */ -int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj) +static void push_evtid(struct afb_evtid *evtid, struct json_object *object) { - int result; + int has_client; struct afb_evt_watch *watch; struct afb_evt_listener *listener; - result = 0; + has_client = 0; pthread_rwlock_rdlock(&evtid->rwlock); watch = evtid->watchs; while(watch) { listener = watch->listener; assert(listener->itf->push != NULL); if (watch->activity != 0) { - listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj)); - result++; + listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object)); + has_client = 1; } watch = watch->next_by_evtid; } + evtid->has_client = has_client; pthread_rwlock_unlock(&evtid->rwlock); - json_object_put(obj); - return result; +} + +/* + * Jobs callback for pushing evtid asynchronously + */ +static void push_job_evtid(int signum, void *closure) +{ + struct job_evtid *je = closure; + + if (signum == 0) + push_evtid(je->evtid, je->object); + destroy_job_evtid(je); +} + +/* + * Pushes the event 'evtid' with 'obj' to its listeners + * 'obj' is released (like json_object_put) + * Returns 1 if at least one listener exists or 0 if no listener exists or + * -1 in case of error and the event can't be delivered + */ +int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object) +{ + struct job_evtid *je; + int rc; + + je = make_job_evtid(evtid, object); + if (je == NULL) { + ERROR("Cant't create push evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + json_object_put(object); + return -1; + } + + rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je); + if (rc == 0) + rc = evtid->has_client; + else { + ERROR("cant't queue push evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + destroy_job_evtid(je); + } + + return rc; } /* @@ -340,6 +525,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) evtid->refcount = 1; evtid->watchs = NULL; evtid->id = event_id_counter; + evtid->has_client = 0; pthread_rwlock_init(&evtid->rwlock, NULL); evtids = evtid; evtid->hookflags = afb_hook_flags_evt(evtid->fullname); @@ -609,6 +795,7 @@ found: if (watch->activity == 0 && listener->itf->add != NULL) listener->itf->add(listener->closure, evtid->fullname, evtid->id); watch->activity++; + evtid->has_client = 1; pthread_rwlock_unlock(&listener->rwlock); return 0; diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c index bb2f0a56..b25cf86e 100644 --- a/src/main-afb-daemon.c +++ b/src/main-afb-daemon.c @@ -903,7 +903,7 @@ int main(int argc, char *argv[]) afb_debug("main-start"); /* enter job processing */ - jobs_start(3, 0, 50, start, NULL); + jobs_start(3, 0, 100, start, NULL); WARNING("hoops returned from jobs_enter! [report bug]"); return 1; } |