summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2016-06-17 22:31:33 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2016-06-23 11:04:23 +0200
commitd8ef25780bffa6f91f013ef71b1ede908325e59d (patch)
treefb0d3aef9cdf0775dcadd2c062741519c4d4fcad
parentb0848149c1ef5236791c0ba5196540d05d9f15fd (diff)
evt: handles broadcasting and tracking
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-api-dbus.c10
-rw-r--r--src/afb-evt.c145
-rw-r--r--src/afb-evt.h11
-rw-r--r--src/afb-svc.c12
-rw-r--r--src/afb-ws-json1.c12
5 files changed, 151 insertions, 39 deletions
diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c
index 3d9da07f..054755ac 100644
--- a/src/afb-api-dbus.c
+++ b/src/afb-api-dbus.c
@@ -537,7 +537,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
return 1;
}
-static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, struct json_object *object)
+static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, int eventid, struct json_object *object)
{
int rc;
@@ -548,6 +548,12 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve
json_object_put(object);
}
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+ .broadcast = (void*)afb_api_dbus_server_send_event,
+ .push = (void*)afb_api_dbus_server_send_event
+};
+
/* create the service */
int afb_api_dbus_add_server(const char *path)
{
@@ -576,7 +582,7 @@ int afb_api_dbus_add_server(const char *path)
}
INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
- api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api);
+ api->listener = afb_evt_listener_create(&evt_itf, api);
return 0;
error3:
diff --git a/src/afb-evt.c b/src/afb-evt.c
index 53ab0e0b..4924b8cb 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -38,8 +38,8 @@ struct afb_evt_listener {
/* chaining listeners */
struct afb_evt_listener *next;
- /* callback on event */
- void (*send)(void *closure, const char *event, struct json_object *object);
+ /* interface for callbacks */
+ const struct afb_evt_itf *itf;
/* closure for the callback */
void *closure;
@@ -56,9 +56,15 @@ struct afb_evt_listener {
*/
struct afb_evt_event {
+ /* next event */
+ struct afb_evt_event *next;
+
/* head of the list of listeners watching the event */
struct afb_evt_watch *watchs;
+ /* id of the event */
+ int id;
+
/* name of the event */
char name[1];
};
@@ -79,6 +85,9 @@ struct afb_evt_watch {
/* link to the next event for the same listener */
struct afb_evt_watch *next_by_listener;
+
+ /* activity */
+ unsigned activity;
};
/* declare functions */
@@ -96,6 +105,11 @@ static struct afb_event_itf afb_evt_event_itf = {
/* head of the list of listeners */
static struct afb_evt_listener *listeners = NULL;
+/* handling id of events */
+static struct afb_evt_event *events = NULL;
+static int event_id_counter = 0;
+static int event_id_wrapped = 0;
+
/*
* Broadcasts the event 'evt' with its 'object'
* 'object' is released (like json_object_put)
@@ -119,9 +133,11 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
result = 0;
listener = listeners;
while(listener) {
- listener->send(listener->closure, event, json_object_get(object));
+ if (listener->itf->broadcast != NULL) {
+ listener->itf->broadcast(listener->closure, event, 0, json_object_get(object));
+ result++;
+ }
listener = listener->next;
- result++;
}
json_object_put(object);
return result;
@@ -140,9 +156,11 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
result = 0;
watch = evt->watchs;
- while(listener) {
+ while(watch) {
listener = watch->listener;
- listener->send(listener->closure, evt->name, json_object_get(obj));
+ assert(listener->itf->push != NULL);
+ if (watch->activity != 0)
+ listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj));
watch = watch->next_by_event;
result++;
}
@@ -156,15 +174,23 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
static void remove_watch(struct afb_evt_watch *watch)
{
struct afb_evt_watch **prv;
+ struct afb_evt_event *evt;
+ struct afb_evt_listener *listener;
+
+ /* notify listener if needed */
+ evt = watch->event;
+ listener = watch->listener;
+ if (watch->activity != 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
/* unlink the watch for its event */
- prv = &watch->event->watchs;
+ prv = &evt->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_event;
*prv = watch->next_by_event;
/* unlink the watch for its listener */
- prv = &watch->listener->watchs;
+ prv = &listener->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_listener;
*prv = watch->next_by_listener;
@@ -178,11 +204,26 @@ static void remove_watch(struct afb_evt_watch *watch)
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ struct afb_evt_event **prv;
if (evt != NULL) {
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
- free(evt);
+ /* removes the event if valid! */
+ prv = &events;
+ while (*prv != NULL) {
+ if (*prv != evt)
+ prv = &(*prv)->next;
+ else {
+ /* valid, unlink */
+ *prv = evt->next;
+
+ /* removes all watchers */
+ while(evt->watchs != NULL)
+ remove_watch(evt->watchs);
+
+ /* free */
+ free(evt);
+ break;
+ }
+ }
}
}
@@ -195,13 +236,37 @@ struct afb_event afb_evt_create_event(const char *name)
size_t len;
struct afb_evt_event *evt;
+ /* allocates the id */
+ do {
+ if (++event_id_counter < 0) {
+ event_id_wrapped = 1;
+ event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
+ }
+ if (!event_id_wrapped)
+ break;
+ evt = events;
+ while(evt != NULL && evt->id != event_id_counter)
+ evt = evt->next;
+ } while (evt != NULL);
+
+ /* allocates the event */
len = strlen(name);
evt = malloc(len + sizeof * evt);
- if (evt != NULL) {
- evt->watchs = NULL;
- memcpy(evt->name, name, len + 1);
- }
+ if (evt == NULL)
+ goto error;
+
+ /* initialize the event */
+ evt->next = events;
+ evt->watchs = NULL;
+ evt->id = event_id_counter;
+ assert(evt->id > 0);
+ memcpy(evt->name, name, len + 1);
+ events = evt;
+
+ /* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+error:
+ return (struct afb_event){ .itf = NULL, .closure = NULL };
}
/*
@@ -213,18 +278,26 @@ const char *afb_evt_event_name(struct afb_event event)
}
/*
+ * Returns the id of the 'event'
+ */
+int afb_evt_event_id(struct afb_event event)
+{
+ return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id;
+}
+
+/*
* Returns an instance of the listener defined by the 'send' callback
* and the 'closure'.
* Returns NULL in case of memory depletion.
*/
-struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
{
struct afb_evt_listener *listener;
/* search if an instance already exists */
listener = listeners;
while (listener != NULL) {
- if (listener->send == send && listener->closure == closure)
+ if (listener->itf == itf && listener->closure == closure)
return afb_evt_listener_addref(listener);
listener = listener->next;
}
@@ -234,7 +307,7 @@ struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, con
if (listener != NULL) {
/* init */
listener->next = listeners;
- listener->send = send;
+ listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
@@ -286,16 +359,17 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
struct afb_evt_event *evt;
/* check parameter */
- if (event.itf != &afb_evt_event_itf) {
+ if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) {
errno = EINVAL;
return -1;
}
- /* search the existing watch */
+ /* search the existing watch for the listener */
+ evt = event.closure;
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure)
- return 0;
+ if (watch->event == evt)
+ goto found;
watch = watch->next_by_listener;
}
@@ -307,14 +381,19 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
}
/* initialise and link */
- evt = event.closure;
watch->event = evt;
watch->next_by_event = evt->watchs;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
+ watch->activity = 0;
evt->watchs = watch;
listener->watchs = watch;
-
+
+found:
+ if (watch->activity == 0 && listener->itf->add != NULL)
+ listener->itf->add(listener->closure, evt->name, evt->id);
+ watch->activity++;
+
return 0;
}
@@ -325,6 +404,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
{
struct afb_evt_watch *watch;
+ struct afb_evt_event *evt;
/* check parameter */
if (event.itf != &afb_evt_event_itf) {
@@ -333,16 +413,21 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve
}
/* search the existing watch */
+ evt = event.closure;
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure) {
+ if (watch->event == evt) {
/* found: remove it */
- remove_watch(watch);
- break;
+ if (watch->activity != 0) {
+ watch->activity--;
+ if (watch->activity == 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
+ }
+ return 0;
}
watch = watch->next_by_listener;
}
- return 0;
+ errno = ENOENT;
+ return -1;
}
-
diff --git a/src/afb-evt.h b/src/afb-evt.h
index 157a7776..8ebb2ec0 100644
--- a/src/afb-evt.h
+++ b/src/afb-evt.h
@@ -22,7 +22,15 @@ struct AFB_clientCtx;
struct afb_evt_listener;
-extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure);
+struct afb_evt_itf
+{
+ void (*push)(void *closure, const char *event, int eventid, struct json_object *object);
+ void (*broadcast)(void *closure, const char *event, int eventid, struct json_object *object);
+ void (*add)(void *closure, const char *event, int eventid);
+ void (*remove)(void *closure, const char *event, int eventid);
+};
+
+extern struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure);
extern int afb_evt_broadcast(const char *event, struct json_object *object);
@@ -31,6 +39,7 @@ extern void afb_evt_listener_unref(struct afb_evt_listener *listener);
extern struct afb_event afb_evt_create_event(const char *name);
extern const char *afb_evt_event_name(struct afb_event event);
+extern int afb_evt_event_id(struct afb_event event);
extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event);
extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event);
diff --git a/src/afb-svc.c b/src/afb-svc.c
index 03ff4b84..95617d5d 100644
--- a/src/afb-svc.c
+++ b/src/afb-svc.c
@@ -64,7 +64,7 @@ struct svc_req
};
/* functions for services */
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object);
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object);
static void svc_call(struct afb_svc *svc, const char *api, const char *verb, struct json_object *args,
void (*callback)(void*, int, struct json_object*), void *closure);
@@ -73,6 +73,12 @@ static const struct afb_service_itf service_itf = {
.call = (void*)svc_call
};
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+ .broadcast = (void*)svc_on_event,
+ .push = (void*)svc_on_event
+};
+
/* functions for requests of services */
static void svcreq_addref(struct svc_req *svcreq);
static void svcreq_unref(struct svc_req *svcreq);
@@ -130,7 +136,7 @@ struct afb_svc *afb_svc_create(int share_session, int (*init)(struct afb_service
if (on_event == NULL)
svc->listener = NULL;
else {
- svc->listener = afb_evt_listener_create((void*)svc_on_event, svc);
+ svc->listener = afb_evt_listener_create(&evt_itf, svc);
if (svc->listener == NULL)
goto error3;
}
@@ -156,7 +162,7 @@ error:
/*
* Propagates the event to the service
*/
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object)
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object)
{
svc->on_event(event, object);
}
diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c
index 4cfc9181..9d295e78 100644
--- a/src/afb-ws-json1.c
+++ b/src/afb-ws-json1.c
@@ -44,7 +44,7 @@ struct afb_wsreq;
/* predeclaration of websocket callbacks */
static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1);
static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg);
-static void aws_on_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object);
+static void aws_on_event(struct afb_ws_json1 *ws, const char *event, int eventid, struct json_object *object);
/* predeclaration of wsreq callbacks */
static void wsreq_addref(struct afb_wsreq *wsreq);
@@ -110,6 +110,12 @@ const struct afb_req_itf afb_ws_json1_req_itf = {
.subcall = (void*)wsreq_subcall
};
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+ .broadcast = (void*)aws_on_event,
+ .push = (void*)aws_on_event
+};
+
/***************************************************************
****************************************************************
**
@@ -141,7 +147,7 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo
if (result->wsj1 == NULL)
goto error3;
- result->listener = afb_evt_listener_create((void*)aws_on_event, result);
+ result->listener = afb_evt_listener_create(&evt_itf, result);
if (result->listener == NULL)
goto error4;
@@ -217,7 +223,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve
wsreq_unref(wsreq);
}
-static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object)
+static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventid, struct json_object *object)
{
afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
}