aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-12-18 18:26:00 +0100
committerJose Bollo <jose.bollo@iot.bzh>2020-01-03 16:53:38 +0100
commit36ed9d2eb93f135ff293df5716f8232c664d32f4 (patch)
treebe2418a3856ab73b6e892399bf977c7f7f5efe0f
parentc6fcbec33ab346ee8a658531afb130647c656df7 (diff)
afb-evt: Ensure unsubscribe works
Fix the logic of unsubscribing to events. It was not possible before to implment it without tracking every session and context. It was not done because of the required complexity. This implementation ensures that unexpected events lead to a removal of the listener from the list of watchers of the events. The management of the list of watchers is reworked to free unused memory. Bug-AGL: SPEC-3069 Change-Id: Ie67372adbde9dcb9dc6c5c2738111d22609e7256 Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-evt.c234
-rw-r--r--src/afb-evt.h1
-rw-r--r--src/afb-stub-ws.c5
3 files changed, 134 insertions, 106 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c
index 400f8fbe..7122a33d 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -87,9 +87,6 @@ struct afb_evtid {
/* id of the event */
uint16_t id;
- /* has client? */
- int has_client;
-
/* fullname of the event */
char fullname[];
};
@@ -110,9 +107,6 @@ struct afb_evt_watch {
/* link to the next watcher for the same listener */
struct afb_evt_watch *next_by_listener;
-
- /* activity */
- unsigned activity;
};
/*
@@ -412,23 +406,17 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
*/
static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
- int has_client;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
- has_client = 0;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
- if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
- has_client = 1;
- }
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
watch = watch->next_by_evtid;
}
- evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
}
@@ -455,6 +443,9 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
struct job_evtid *je;
int rc;
+ if (!evtid->watchs)
+ return 0;
+
je = make_job_evtid(evtid, object);
if (je == NULL) {
ERROR("Cant't create push evtid job item for %s(%s)",
@@ -465,7 +456,7 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
if (rc == 0)
- rc = evtid->has_client;
+ rc = 1;
else {
ERROR("cant't queue push evtid job item for %s(%s)",
evtid->fullname, json_object_to_json_string(object));
@@ -507,32 +498,54 @@ int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
}
#endif
-/*
- * remove the 'watch'
- */
-static void remove_watch(struct afb_evt_watch *watch)
+static void unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, int remove)
+{
+ /* notify listener if needed */
+ if (remove && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+}
+
+static void evtid_unwatch(struct afb_evtid *evtid, struct afb_evt_listener *listener, struct afb_evt_watch *watch, int remove)
{
struct afb_evt_watch **prv;
- struct afb_evtid *evtid;
- struct afb_evt_listener *listener;
/* notify listener if needed */
- evtid = watch->evtid;
- listener = watch->listener;
- if (watch->activity != 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+ unwatch(listener, evtid, remove);
/* unlink the watch for its event */
- prv = &evtid->watchs;
- while(*prv != watch)
- prv = &(*prv)->next_by_evtid;
- *prv = watch->next_by_evtid;
-
- /* unlink the watch for its listener */
+ pthread_rwlock_wrlock(&listener->rwlock);
prv = &listener->watchs;
- while(*prv != watch)
+ while(*prv) {
+ if (*prv == watch) {
+ *prv = watch->next_by_listener;
+ break;
+ }
prv = &(*prv)->next_by_listener;
- *prv = watch->next_by_listener;
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+
+ /* recycle memory */
+ free(watch);
+}
+
+static void listener_unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, struct afb_evt_watch *watch, int remove)
+{
+ struct afb_evt_watch **prv;
+
+ /* notify listener if needed */
+ unwatch(listener, evtid, remove);
+
+ /* unlink the watch for its event */
+ pthread_rwlock_wrlock(&evtid->rwlock);
+ prv = &evtid->watchs;
+ while(*prv) {
+ if (*prv == watch) {
+ *prv = watch->next_by_evtid;
+ break;
+ }
+ prv = &(*prv)->next_by_evtid;
+ }
+ pthread_rwlock_unlock(&evtid->rwlock);
/* recycle memory */
free(watch);
@@ -578,7 +591,6 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = id;
- evtid->has_client = 0;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
#if WITH_AFB_HOOK
@@ -644,41 +656,42 @@ struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
*/
void afb_evt_evtid_unref(struct afb_evtid *evtid)
{
- int found;
- struct afb_evtid **prv;
- struct afb_evt_listener *listener;
+ struct afb_evtid **prv, *oev;
+ struct afb_evt_watch *watch, *nwatch;
if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
/* unlinks the event if valid! */
pthread_rwlock_wrlock(&events_rwlock);
- found = 0;
prv = &evtids;
- while (*prv && !(found = (*prv == evtid)))
- prv = &(*prv)->next;
- if (found) {
- *prv = evtid->next;
- event_count--;
+ for(;;) {
+ oev = *prv;
+ if (oev == evtid)
+ break;
+ if (!oev) {
+ ERROR("unexpected event");
+ pthread_rwlock_unlock(&events_rwlock);
+ return;
+ }
+ prv = &oev->next;
}
+ event_count--;
+ *prv = evtid->next;
pthread_rwlock_unlock(&events_rwlock);
- /* destroys the event */
- if (!found)
- ERROR("event not found");
- else {
- /* removes all watchers */
- while(evtid->watchs != NULL) {
- listener = evtid->watchs->listener;
- pthread_rwlock_wrlock(&listener->rwlock);
- pthread_rwlock_wrlock(&evtid->rwlock);
- remove_watch(evtid->watchs);
- pthread_rwlock_unlock(&evtid->rwlock);
- pthread_rwlock_unlock(&listener->rwlock);
- }
-
- /* free */
- pthread_rwlock_destroy(&evtid->rwlock);
- free(evtid);
+ /* removes all watchers */
+ pthread_rwlock_wrlock(&evtid->rwlock);
+ watch = evtid->watchs;
+ evtid->watchs = NULL;
+ pthread_rwlock_unlock(&evtid->rwlock);
+ while(watch) {
+ nwatch = watch->next_by_evtid;
+ evtid_unwatch(evtid, watch->listener, watch, 1);
+ watch = nwatch;
}
+
+ /* free */
+ pthread_rwlock_destroy(&evtid->rwlock);
+ free(evtid);
}
}
@@ -785,28 +798,29 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- struct afb_evt_listener **prv;
- struct afb_evtid *evtid;
+ struct afb_evt_listener **prv, *olis;
if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
pthread_rwlock_wrlock(&listeners_rwlock);
prv = &listeners;
- while (*prv != listener)
- prv = &(*prv)->next;
+ for(;;) {
+ olis = *prv;
+ if (olis == listener)
+ break;
+ if (!olis) {
+ ERROR("unexpected listener");
+ pthread_rwlock_unlock(&listeners_rwlock);
+ return;
+ }
+ prv = &olis->next;
+ }
*prv = listener->next;
pthread_rwlock_unlock(&listeners_rwlock);
/* remove the watchers */
- pthread_rwlock_wrlock(&listener->rwlock);
- while (listener->watchs != NULL) {
- evtid = listener->watchs->evtid;
- pthread_rwlock_wrlock(&evtid->rwlock);
- remove_watch(listener->watchs);
- pthread_rwlock_unlock(&evtid->rwlock);
- }
- pthread_rwlock_unlock(&listener->rwlock);
+ afb_evt_listener_unwatch_all(listener, 0);
/* free the listener */
pthread_rwlock_destroy(&listener->rwlock);
@@ -833,7 +847,7 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
watch = listener->watchs;
while(watch != NULL) {
if (watch->evtid == evtid)
- goto found;
+ goto end;
watch = watch->next_by_listener;
}
@@ -847,7 +861,6 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
/* initialise and link */
watch->evtid = evtid;
- watch->activity = 0;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
listener->watchs = watch;
@@ -856,13 +869,10 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
evtid->watchs = watch;
pthread_rwlock_unlock(&evtid->rwlock);
-found:
- if (watch->activity == 0 && listener->itf->add != NULL)
+ if (listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
- watch->activity++;
- evtid->has_client = 1;
+end:
pthread_rwlock_unlock(&listener->rwlock);
-
return 0;
}
@@ -872,26 +882,26 @@ found:
*/
int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid)
{
- struct afb_evt_watch *watch;
+ struct afb_evt_watch *watch, **pwatch;
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
- watch = listener->watchs;
- while(watch != NULL) {
- if (watch->evtid == evtid) {
- if (watch->activity != 0) {
- watch->activity--;
- if (watch->activity == 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
- }
+ pwatch = &listener->watchs;
+ for (;;) {
+ watch = *pwatch;
+ if (!watch) {
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+ }
+ if (evtid == watch->evtid) {
+ *pwatch = watch->next_by_listener;
pthread_rwlock_unlock(&listener->rwlock);
+ listener_unwatch(listener, evtid, watch, 1);
return 0;
}
- watch = watch->next_by_listener;
+ pwatch = &watch->next_by_listener;
}
- pthread_rwlock_unlock(&listener->rwlock);
- errno = ENOENT;
- return -1;
}
/*
@@ -900,28 +910,48 @@ int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_e
*/
int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid)
{
- struct afb_evt_watch *watch;
+ struct afb_evt_watch *watch, **pwatch;
struct afb_evtid *evtid;
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
- watch = listener->watchs;
- while(watch != NULL) {
+ pwatch = &listener->watchs;
+ for (;;) {
+ watch = *pwatch;
+ if (!watch) {
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+ }
evtid = watch->evtid;
if (evtid->id == eventid) {
- if (watch->activity != 0) {
- watch->activity--;
- if (watch->activity == 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
- }
+ *pwatch = watch->next_by_listener;
pthread_rwlock_unlock(&listener->rwlock);
+ listener_unwatch(listener, evtid, watch, 1);
return 0;
}
- watch = watch->next_by_listener;
+ pwatch = &watch->next_by_listener;
}
+}
+
+/*
+ * Avoids the 'listener' to watch any event, calling the callback
+ * 'remove' of the interface if 'remoe' is not zero.
+ */
+void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove)
+{
+ struct afb_evt_watch *watch, *nwatch;
+
+ /* search the existing watch */
+ pthread_rwlock_wrlock(&listener->rwlock);
+ watch = listener->watchs;
+ listener->watchs = NULL;
pthread_rwlock_unlock(&listener->rwlock);
- errno = ENOENT;
- return -1;
+ while(watch) {
+ nwatch = watch->next_by_listener;
+ listener_unwatch(listener, watch->evtid, watch, remove);
+ watch = nwatch;
+ }
}
#if WITH_AFB_HOOK
diff --git a/src/afb-evt.h b/src/afb-evt.h
index 88308aaa..e4c54eed 100644
--- a/src/afb-evt.h
+++ b/src/afb-evt.h
@@ -61,6 +61,7 @@ extern int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *
extern int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid);
+extern void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove);
extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname);
extern struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 197d4cdc..806c5e47 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -177,13 +177,10 @@ static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *e
static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event)
{
- int rc, rc2;
+ int rc;
struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event));
- rc2 = afb_evt_listener_unwatch_x2(wreq->stubws->listener, event);
- if (rc >= 0 && rc2 < 0)
- rc = rc2;
if (rc < 0)
ERROR("error while unsubscribing event");
return rc;