aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-07-16 23:39:27 +0200
committerJose Bollo <jose.bollo@iot.bzh>2019-07-18 15:39:52 +0200
commit037157919df0a7ee90837037748a6456431e6469 (patch)
treea33e3494a78052b73aea06b694b98298ea04de36
parenta445694bda8773cb80566808b5dcb56f033dee58 (diff)
Tag broadcasted events with UUID and hop
When API have mutual dependencies, leading to loops in dependecies, broadcasting an event never ends because of the loop (see SPEC-2625). To avoid that weird flood of events, a unic identifier (UUID) is attached to broadcasted event and a tiny memory records previously broadcasted events to avoid re-sending an already sent event. The size of the memory can be set using the macro variable EVENT_BROADCAST_MEMORY_COUNT whose default value is 8. It can be reduced to 0. An other mecanism is added to limit the count of hops that a broadcasted event can do. That count can be set using the macro variable EVENT_BROADCAST_HOP_MAX whose default value is 10. Bug-AGL: SPEC-2625 Signed-off-by: José Bollo <jose.bollo@iot.bzh> Change-Id: I45877583dbf478a79d405c3650880a5579ac1f9e
-rw-r--r--bindings/samples/hello3.c2
-rw-r--r--src/afb-api-dbus.c4
-rw-r--r--src/afb-evt.c101
-rw-r--r--src/afb-evt.h5
-rw-r--r--src/afb-export.c2
-rw-r--r--src/afb-proto-ws.c32
-rw-r--r--src/afb-proto-ws.h6
-rw-r--r--src/afb-stub-ws.c8
-rw-r--r--src/afb-ws-json1.c14
-rw-r--r--src/main-afb-client-demo.c4
10 files changed, 139 insertions, 39 deletions
diff --git a/bindings/samples/hello3.c b/bindings/samples/hello3.c
index 1d7c1954..37da6127 100644
--- a/bindings/samples/hello3.c
+++ b/bindings/samples/hello3.c
@@ -424,7 +424,7 @@ static void broadcast(afb_req_t request)
afb_req_success(request, NULL, NULL);
pthread_mutex_unlock(&mutex);
} else if (name != NULL) {
- if (0 > afb_daemon_broadcast_event(name, object))
+ if (0 > afb_daemon_broadcast_event(name, json_object_get(object)))
afb_req_fail(request, "failed", "broadcast error");
else
afb_req_success(request, NULL, NULL);
diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c
index 966c9656..ec2ca760 100644
--- a/src/afb-api-dbus.c
+++ b/src/afb-api-dbus.c
@@ -634,7 +634,7 @@ error:
static void afb_api_dbus_server_event_add(void *closure, const char *event, int eventid);
static void afb_api_dbus_server_event_remove(void *closure, const char *event, int eventid);
static void afb_api_dbus_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
-static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, struct json_object *object);
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, struct json_object *object, const char *uuid);
/* the interface for events broadcasting */
static const struct afb_evt_itf evt_broadcast_itf = {
@@ -916,7 +916,7 @@ static void afb_api_dbus_server_event_push(void *closure, const char *event, int
json_object_put(object);
}
-static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, struct json_object *object)
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, struct json_object *object, const char *uuid)
{
int rc;
struct api_dbus *api;
diff --git a/src/afb-evt.c b/src/afb-evt.c
index ddad5759..24ac8bee 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -31,6 +31,7 @@
#include "afb-hook.h"
#include "verbose.h"
#include "jobs.h"
+#include "uuid.h"
struct afb_evt_watch;
@@ -120,6 +121,12 @@ struct job_broadcast
/** object atached to the event */
struct json_object *object;
+ /** the uuid of the event */
+ uuid_binary_t uuid;
+
+ /** remaining hop */
+ uint8_t hop;
+
/** name of the event to broadcast */
char event[];
};
@@ -168,16 +175,39 @@ static struct afb_evtid *evtids = NULL;
static int event_id_counter = 0;
static int event_id_wrapped = 0;
+/* head of uniqueness of events */
+#if !defined(EVENT_BROADCAST_HOP_MAX)
+# define EVENT_BROADCAST_HOP_MAX 10
+#endif
+#if !defined(EVENT_BROADCAST_MEMORY_COUNT)
+# define EVENT_BROADCAST_MEMORY_COUNT 8
+#endif
+
+#if EVENT_BROADCAST_MEMORY_COUNT
+static struct {
+ pthread_mutex_t mutex;
+ uint8_t base;
+ uint8_t count;
+ uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT];
+} uniqueness = {
+ .mutex = PTHREAD_MUTEX_INITIALIZER,
+ .base = 0,
+ .count = 0
+};
+#endif
+
/*
* Create structure for job of broadcasting string 'event' with 'object'
* Returns the created structure or NULL if out of memory
*/
-static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object)
+static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
size_t sz = 1 + strlen(event);
struct job_broadcast *jb = malloc(sz + sizeof *jb);
if (jb) {
jb->object = object;
+ memcpy(jb->uuid, uuid, sizeof jb->uuid);
+ jb->hop = hop;
memcpy(jb->event, event, sz);
}
return jb;
@@ -219,7 +249,7 @@ static void destroy_job_evtid(struct job_evtid *je)
/*
* Broadcasts the 'event' of 'id' with its 'object'
*/
-static void broadcast(const char *event, struct json_object *object)
+static void broadcast(struct job_broadcast *jb)
{
struct afb_evt_listener *listener;
@@ -227,7 +257,7 @@ static void broadcast(const char *event, struct json_object *object)
listener = listeners;
while(listener) {
if (listener->itf->broadcast != NULL)
- listener->itf->broadcast(listener->closure, event, json_object_get(object));
+ listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop);
listener = listener->next;
}
pthread_rwlock_unlock(&listeners_rwlock);
@@ -241,19 +271,56 @@ static void broadcast_job(int signum, void *closure)
struct job_broadcast *jb = closure;
if (signum == 0)
- broadcast(jb->event, jb->object);
+ broadcast(jb);
destroy_job_broadcast(jb);
}
/*
* Broadcasts the string 'event' with its 'object'
*/
-static int unhooked_broadcast(const char *event, struct json_object *object)
+static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
+ uuid_binary_t local_uuid;
struct job_broadcast *jb;
int rc;
+#if EVENT_BROADCAST_MEMORY_COUNT
+ int iter, count;
+#endif
+
+ /* check if lately sent */
+ if (!uuid) {
+ uuid_new_binary(local_uuid);
+ uuid = local_uuid;
+ hop = EVENT_BROADCAST_HOP_MAX;
+#if EVENT_BROADCAST_MEMORY_COUNT
+ pthread_mutex_lock(&uniqueness.mutex);
+ } else {
+ pthread_mutex_lock(&uniqueness.mutex);
+ iter = (int)uniqueness.base;
+ count = (int)uniqueness.count;
+ while (count) {
+ if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) {
+ pthread_mutex_unlock(&uniqueness.mutex);
+ return 0;
+ }
+ if (++iter == EVENT_BROADCAST_MEMORY_COUNT)
+ iter = 0;
+ count--;
+ }
+ }
+ iter = (int)uniqueness.base;
+ if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT)
+ iter += (int)(uniqueness.count++);
+ else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT)
+ uniqueness.base = 0;
+ memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t));
+ pthread_mutex_unlock(&uniqueness.mutex);
+#else
+ }
+#endif
- jb = make_job_broadcast(event, object);
+ /* create the structure for the job */
+ jb = make_job_broadcast(event, object, uuid, hop);
if (jb == NULL) {
ERROR("Cant't create broadcast string job item for %s(%s)",
event, json_object_to_json_string(object));
@@ -261,6 +328,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object)
return -1;
}
+ /* queue the job */
rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
if (rc) {
ERROR("cant't queue broadcast string job item for %s(%s)",
@@ -277,7 +345,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object)
*/
int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
{
- return unhooked_broadcast(evtid->fullname, object);
+ return unhooked_broadcast(evtid->fullname, object, NULL, 0);
}
/*
@@ -304,12 +372,7 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
return result;
}
-/*
- * Broadcasts the 'event' with its 'object'
- * 'object' is released (like json_object_put)
- * Returns the count of listener having receive the event.
- */
-int afb_evt_broadcast(const char *event, struct json_object *object)
+int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
int result;
@@ -318,7 +381,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
afb_hook_evt_broadcast_before(event, 0, object);
#endif
- result = unhooked_broadcast(event, object);
+ result = unhooked_broadcast(event, object, uuid, hop);
#if WITH_AFB_HOOK
afb_hook_evt_broadcast_after(event, 0, object, result);
@@ -328,6 +391,16 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
}
/*
+ * Broadcasts the 'event' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener having receive the event.
+ */
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+ return afb_evt_rebroadcast(event, object, NULL, 0);
+}
+
+/*
* Pushes the event 'evtid' with 'obj' to its listeners
* Returns the count of listener that received the event.
*/
diff --git a/src/afb-evt.h b/src/afb-evt.h
index 423bd552..02693e9c 100644
--- a/src/afb-evt.h
+++ b/src/afb-evt.h
@@ -17,6 +17,8 @@
#pragma once
+#include "uuid.h"
+
struct afb_event_x1;
struct afb_event_x2;
struct afb_evtid;
@@ -27,7 +29,7 @@ struct afb_evt_listener;
struct afb_evt_itf
{
void (*push)(void *closure, const char *event, int evtid, struct json_object *object);
- void (*broadcast)(void *closure, const char *event, struct json_object *object);
+ void (*broadcast)(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop);
void (*add)(void *closure, const char *event, int evtid);
void (*remove)(void *closure, const char *event, int evtid);
};
@@ -35,6 +37,7 @@ struct afb_evt_itf
extern struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure);
extern int afb_evt_broadcast(const char *event, struct json_object *object);
+extern int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop);
extern struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener);
extern void afb_evt_listener_unref(struct afb_evt_listener *listener);
diff --git a/src/afb-export.c b/src/afb-export.c
index de6134d3..c146f7e8 100644
--- a/src/afb-export.c
+++ b/src/afb-export.c
@@ -1183,7 +1183,7 @@ static void listener_of_pushed_events(void *closure, const char *event, int even
listener_of_events(closure, event, eventid, object);
}
-static void listener_of_broadcasted_events(void *closure, const char *event, struct json_object *object)
+static void listener_of_broadcasted_events(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
listener_of_events(closure, event, 0, object);
}
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index f4070860..ea95110a 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -560,11 +560,12 @@ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct rea
/* receives broadcasted events */
static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
+ const char *event_name, *uuid;
+ char hop;
struct json_object *object;
- if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object))
- protows->client_itf->on_event_broadcast(protows->closure, event_name, object);
+ if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_char(rb, &hop))
+ protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, (uint8_t)hop);
else
ERROR("Ignoring broadcast of event");
}
@@ -909,9 +910,9 @@ static int server_event_send(struct afb_proto_ws *protows, char order, const cha
int rc;
if (writebuf_char(&wb, order)
- && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+ && writebuf_uint32(&wb, event_id)
&& writebuf_string(&wb, event_name)
- && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+ && (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data))) {
pthread_mutex_lock(&protows->mutex);
rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
pthread_mutex_unlock(&protows->mutex);
@@ -936,9 +937,26 @@ int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *eve
return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data);
}
-int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data)
+int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop)
{
- return server_event_send(protows, CHAR_FOR_EVT_BROADCAST, event_name, 0, data);
+ struct writebuf wb = { .count = 0 };
+ int rc;
+
+ if (!hop--)
+ return 0;
+
+ if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST)
+ && writebuf_string(&wb, event_name)
+ && writebuf_object(&wb, data)
+ && writebuf_put(&wb, uuid, 16)
+ && writebuf_char(&wb, (char)hop)) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
/*****************************************************/
diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h
index d797cb6f..df51e6a7 100644
--- a/src/afb-proto-ws.h
+++ b/src/afb-proto-ws.h
@@ -29,6 +29,8 @@ struct afb_proto_ws;
struct afb_proto_ws_call;
struct afb_proto_ws_describe;
+typedef unsigned char afb_proto_ws_uuid_t[16];
+
struct afb_proto_ws_client_itf
{
/* can't be NULL */
@@ -40,7 +42,7 @@ struct afb_proto_ws_client_itf
void (*on_event_subscribe)(void *closure, void *request, const char *event_name, int event_id);
void (*on_event_unsubscribe)(void *closure, void *request, const char *event_name, int event_id);
void (*on_event_push)(void *closure, const char *event_name, int event_id, struct json_object *data);
- void (*on_event_broadcast)(void *closure, const char *event_name, struct json_object *data);
+ void (*on_event_broadcast)(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
};
struct afb_proto_ws_server_itf
@@ -70,7 +72,7 @@ extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*cal
extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id);
extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id);
extern int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data);
-extern int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data);
+extern int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
extern void afb_proto_ws_call_addref(struct afb_proto_ws_call *call);
extern void afb_proto_ws_call_unref(struct afb_proto_ws_call *call);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 0c440bd5..e900ac76 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -341,12 +341,12 @@ static void server_event_push_cb(void *closure, const char *event, int eventid,
json_object_put(object);
}
-static void server_event_broadcast_cb(void *closure, const char *event, struct json_object *object)
+static void server_event_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
struct afb_stub_ws *stubws = closure;
if (stubws->proto != NULL)
- afb_proto_ws_server_event_broadcast(stubws->proto, event, object);
+ afb_proto_ws_server_event_broadcast(stubws->proto, event, object, uuid, hop);
json_object_put(object);
}
@@ -457,9 +457,9 @@ static void client_on_event_push_cb(void *closure, const char *event_name, int e
ERROR("unreadable push event");
}
-static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data)
+static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data, const uuid_binary_t uuid, uint8_t hop)
{
- afb_evt_broadcast(event_name, data);
+ afb_evt_rebroadcast(event_name, data, uuid, hop);
}
/*****************************************************/
diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c
index 4f5cb868..5e71ff1f 100644
--- a/src/afb-ws-json1.c
+++ b/src/afb-ws-json1.c
@@ -46,7 +46,7 @@ struct afb_wsreq;
static void aws_on_hangup_cb(void *closure, struct afb_wsj1 *wsj1);
static void aws_on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg);
static void aws_on_push_cb(void *closure, const char *event, int eventid, struct json_object *object);
-static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object);
+static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop);
/* predeclaration of wsreq callbacks */
static void wsreq_destroy(struct afb_xreq *xreq);
@@ -207,15 +207,19 @@ static void aws_on_call_cb(void *closure, const char *api, const char *verb, str
afb_xreq_process(&wsreq->xreq, ws->apiset);
}
+static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object)
+{
+ afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
+}
+
static void aws_on_push_cb(void *closure, const char *event, int eventid, struct json_object *object)
{
- aws_on_broadcast_cb(closure, event, object);
+ aws_on_event(closure, event, object);
}
-static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object)
+static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
{
- struct afb_ws_json1 *aws = closure;
- afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
+ aws_on_event(closure, event, afb_msg_json_event(event, object));
}
/***************************************************************
diff --git a/src/main-afb-client-demo.c b/src/main-afb-client-demo.c
index 1d056365..b772c9d7 100644
--- a/src/main-afb-client-demo.c
+++ b/src/main-afb-client-demo.c
@@ -60,7 +60,7 @@ static void on_pws_event_remove(void *closure, const char *event_name, int event
static void on_pws_event_subscribe(void *closure, void *request, const char *event_name, int event_id);
static void on_pws_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id);
static void on_pws_event_push(void *closure, const char *event_name, int event_id, struct json_object *data);
-static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data);
+static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
static void idle();
static int process_stdin();
@@ -522,7 +522,7 @@ static void on_pws_event_push(void *closure, const char *event_name, int event_i
fflush(stdout);
}
-static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data)
+static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop)
{
if (raw)
printf("ON-EVENT-BROADCAST: [%s]\n%s\n", event_name, json_object_to_json_string_ext(data, JSON_C_TO_STRING_NOSLASHESCAPE));