diff options
author | Jose Bollo <jose.bollo@iot.bzh> | 2019-12-18 18:26:00 +0100 |
---|---|---|
committer | Jose Bollo <jose.bollo@iot.bzh> | 2020-01-03 16:53:38 +0100 |
commit | 36ed9d2eb93f135ff293df5716f8232c664d32f4 (patch) | |
tree | be2418a3856ab73b6e892399bf977c7f7f5efe0f | |
parent | c6fcbec33ab346ee8a658531afb130647c656df7 (diff) |
afb-evt: Ensure unsubscribe works
Fix the logic of unsubscribing to events. It
was not possible before to implment it without
tracking every session and context. It was not
done because of the required complexity.
This implementation ensures that unexpected
events lead to a removal of the listener from the
list of watchers of the events.
The management of the list of watchers is reworked
to free unused memory.
Bug-AGL: SPEC-3069
Change-Id: Ie67372adbde9dcb9dc6c5c2738111d22609e7256
Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
-rw-r--r-- | src/afb-evt.c | 234 | ||||
-rw-r--r-- | src/afb-evt.h | 1 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 5 |
3 files changed, 134 insertions, 106 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c index 400f8fbe..7122a33d 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -87,9 +87,6 @@ struct afb_evtid { /* id of the event */ uint16_t id; - /* has client? */ - int has_client; - /* fullname of the event */ char fullname[]; }; @@ -110,9 +107,6 @@ struct afb_evt_watch { /* link to the next watcher for the same listener */ struct afb_evt_watch *next_by_listener; - - /* activity */ - unsigned activity; }; /* @@ -412,23 +406,17 @@ int afb_evt_broadcast(const char *event, struct json_object *object) */ static void push_evtid(struct afb_evtid *evtid, struct json_object *object) { - int has_client; struct afb_evt_watch *watch; struct afb_evt_listener *listener; - has_client = 0; pthread_rwlock_rdlock(&evtid->rwlock); watch = evtid->watchs; while(watch) { listener = watch->listener; assert(listener->itf->push != NULL); - if (watch->activity != 0) { - listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object)); - has_client = 1; - } + listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object)); watch = watch->next_by_evtid; } - evtid->has_client = has_client; pthread_rwlock_unlock(&evtid->rwlock); } @@ -455,6 +443,9 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object) struct job_evtid *je; int rc; + if (!evtid->watchs) + return 0; + je = make_job_evtid(evtid, object); if (je == NULL) { ERROR("Cant't create push evtid job item for %s(%s)", @@ -465,7 +456,7 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object) rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je); if (rc == 0) - rc = evtid->has_client; + rc = 1; else { ERROR("cant't queue push evtid job item for %s(%s)", evtid->fullname, json_object_to_json_string(object)); @@ -507,32 +498,54 @@ int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj) } #endif -/* - * remove the 'watch' - */ -static void remove_watch(struct afb_evt_watch *watch) +static void unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, int remove) +{ + /* notify listener if needed */ + if (remove && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evtid->fullname, evtid->id); +} + +static void evtid_unwatch(struct afb_evtid *evtid, struct afb_evt_listener *listener, struct afb_evt_watch *watch, int remove) { struct afb_evt_watch **prv; - struct afb_evtid *evtid; - struct afb_evt_listener *listener; /* notify listener if needed */ - evtid = watch->evtid; - listener = watch->listener; - if (watch->activity != 0 && listener->itf->remove != NULL) - listener->itf->remove(listener->closure, evtid->fullname, evtid->id); + unwatch(listener, evtid, remove); /* unlink the watch for its event */ - prv = &evtid->watchs; - while(*prv != watch) - prv = &(*prv)->next_by_evtid; - *prv = watch->next_by_evtid; - - /* unlink the watch for its listener */ + pthread_rwlock_wrlock(&listener->rwlock); prv = &listener->watchs; - while(*prv != watch) + while(*prv) { + if (*prv == watch) { + *prv = watch->next_by_listener; + break; + } prv = &(*prv)->next_by_listener; - *prv = watch->next_by_listener; + } + pthread_rwlock_unlock(&listener->rwlock); + + /* recycle memory */ + free(watch); +} + +static void listener_unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, struct afb_evt_watch *watch, int remove) +{ + struct afb_evt_watch **prv; + + /* notify listener if needed */ + unwatch(listener, evtid, remove); + + /* unlink the watch for its event */ + pthread_rwlock_wrlock(&evtid->rwlock); + prv = &evtid->watchs; + while(*prv) { + if (*prv == watch) { + *prv = watch->next_by_evtid; + break; + } + prv = &(*prv)->next_by_evtid; + } + pthread_rwlock_unlock(&evtid->rwlock); /* recycle memory */ free(watch); @@ -578,7 +591,6 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) evtid->refcount = 1; evtid->watchs = NULL; evtid->id = id; - evtid->has_client = 0; pthread_rwlock_init(&evtid->rwlock, NULL); evtids = evtid; #if WITH_AFB_HOOK @@ -644,41 +656,42 @@ struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid) */ void afb_evt_evtid_unref(struct afb_evtid *evtid) { - int found; - struct afb_evtid **prv; - struct afb_evt_listener *listener; + struct afb_evtid **prv, *oev; + struct afb_evt_watch *watch, *nwatch; if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) { /* unlinks the event if valid! */ pthread_rwlock_wrlock(&events_rwlock); - found = 0; prv = &evtids; - while (*prv && !(found = (*prv == evtid))) - prv = &(*prv)->next; - if (found) { - *prv = evtid->next; - event_count--; + for(;;) { + oev = *prv; + if (oev == evtid) + break; + if (!oev) { + ERROR("unexpected event"); + pthread_rwlock_unlock(&events_rwlock); + return; + } + prv = &oev->next; } + event_count--; + *prv = evtid->next; pthread_rwlock_unlock(&events_rwlock); - /* destroys the event */ - if (!found) - ERROR("event not found"); - else { - /* removes all watchers */ - while(evtid->watchs != NULL) { - listener = evtid->watchs->listener; - pthread_rwlock_wrlock(&listener->rwlock); - pthread_rwlock_wrlock(&evtid->rwlock); - remove_watch(evtid->watchs); - pthread_rwlock_unlock(&evtid->rwlock); - pthread_rwlock_unlock(&listener->rwlock); - } - - /* free */ - pthread_rwlock_destroy(&evtid->rwlock); - free(evtid); + /* removes all watchers */ + pthread_rwlock_wrlock(&evtid->rwlock); + watch = evtid->watchs; + evtid->watchs = NULL; + pthread_rwlock_unlock(&evtid->rwlock); + while(watch) { + nwatch = watch->next_by_evtid; + evtid_unwatch(evtid, watch->listener, watch, 1); + watch = nwatch; } + + /* free */ + pthread_rwlock_destroy(&evtid->rwlock); + free(evtid); } } @@ -785,28 +798,29 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen */ void afb_evt_listener_unref(struct afb_evt_listener *listener) { - struct afb_evt_listener **prv; - struct afb_evtid *evtid; + struct afb_evt_listener **prv, *olis; if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) { /* unlink the listener */ pthread_rwlock_wrlock(&listeners_rwlock); prv = &listeners; - while (*prv != listener) - prv = &(*prv)->next; + for(;;) { + olis = *prv; + if (olis == listener) + break; + if (!olis) { + ERROR("unexpected listener"); + pthread_rwlock_unlock(&listeners_rwlock); + return; + } + prv = &olis->next; + } *prv = listener->next; pthread_rwlock_unlock(&listeners_rwlock); /* remove the watchers */ - pthread_rwlock_wrlock(&listener->rwlock); - while (listener->watchs != NULL) { - evtid = listener->watchs->evtid; - pthread_rwlock_wrlock(&evtid->rwlock); - remove_watch(listener->watchs); - pthread_rwlock_unlock(&evtid->rwlock); - } - pthread_rwlock_unlock(&listener->rwlock); + afb_evt_listener_unwatch_all(listener, 0); /* free the listener */ pthread_rwlock_destroy(&listener->rwlock); @@ -833,7 +847,7 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt watch = listener->watchs; while(watch != NULL) { if (watch->evtid == evtid) - goto found; + goto end; watch = watch->next_by_listener; } @@ -847,7 +861,6 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt /* initialise and link */ watch->evtid = evtid; - watch->activity = 0; watch->listener = listener; watch->next_by_listener = listener->watchs; listener->watchs = watch; @@ -856,13 +869,10 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt evtid->watchs = watch; pthread_rwlock_unlock(&evtid->rwlock); -found: - if (watch->activity == 0 && listener->itf->add != NULL) + if (listener->itf->add != NULL) listener->itf->add(listener->closure, evtid->fullname, evtid->id); - watch->activity++; - evtid->has_client = 1; +end: pthread_rwlock_unlock(&listener->rwlock); - return 0; } @@ -872,26 +882,26 @@ found: */ int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid) { - struct afb_evt_watch *watch; + struct afb_evt_watch *watch, **pwatch; /* search the existing watch */ pthread_rwlock_wrlock(&listener->rwlock); - watch = listener->watchs; - while(watch != NULL) { - if (watch->evtid == evtid) { - if (watch->activity != 0) { - watch->activity--; - if (watch->activity == 0 && listener->itf->remove != NULL) - listener->itf->remove(listener->closure, evtid->fullname, evtid->id); - } + pwatch = &listener->watchs; + for (;;) { + watch = *pwatch; + if (!watch) { + pthread_rwlock_unlock(&listener->rwlock); + errno = ENOENT; + return -1; + } + if (evtid == watch->evtid) { + *pwatch = watch->next_by_listener; pthread_rwlock_unlock(&listener->rwlock); + listener_unwatch(listener, evtid, watch, 1); return 0; } - watch = watch->next_by_listener; + pwatch = &watch->next_by_listener; } - pthread_rwlock_unlock(&listener->rwlock); - errno = ENOENT; - return -1; } /* @@ -900,28 +910,48 @@ int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_e */ int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid) { - struct afb_evt_watch *watch; + struct afb_evt_watch *watch, **pwatch; struct afb_evtid *evtid; /* search the existing watch */ pthread_rwlock_wrlock(&listener->rwlock); - watch = listener->watchs; - while(watch != NULL) { + pwatch = &listener->watchs; + for (;;) { + watch = *pwatch; + if (!watch) { + pthread_rwlock_unlock(&listener->rwlock); + errno = ENOENT; + return -1; + } evtid = watch->evtid; if (evtid->id == eventid) { - if (watch->activity != 0) { - watch->activity--; - if (watch->activity == 0 && listener->itf->remove != NULL) - listener->itf->remove(listener->closure, evtid->fullname, evtid->id); - } + *pwatch = watch->next_by_listener; pthread_rwlock_unlock(&listener->rwlock); + listener_unwatch(listener, evtid, watch, 1); return 0; } - watch = watch->next_by_listener; + pwatch = &watch->next_by_listener; } +} + +/* + * Avoids the 'listener' to watch any event, calling the callback + * 'remove' of the interface if 'remoe' is not zero. + */ +void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove) +{ + struct afb_evt_watch *watch, *nwatch; + + /* search the existing watch */ + pthread_rwlock_wrlock(&listener->rwlock); + watch = listener->watchs; + listener->watchs = NULL; pthread_rwlock_unlock(&listener->rwlock); - errno = ENOENT; - return -1; + while(watch) { + nwatch = watch->next_by_listener; + listener_unwatch(listener, watch->evtid, watch, remove); + watch = nwatch; + } } #if WITH_AFB_HOOK diff --git a/src/afb-evt.h b/src/afb-evt.h index 88308aaa..e4c54eed 100644 --- a/src/afb-evt.h +++ b/src/afb-evt.h @@ -61,6 +61,7 @@ extern int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object * extern int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid); extern int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid); extern int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid); +extern void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove); extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname); extern struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name); diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 197d4cdc..806c5e47 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -177,13 +177,10 @@ static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *e static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event) { - int rc, rc2; + int rc; struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq); rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event)); - rc2 = afb_evt_listener_unwatch_x2(wreq->stubws->listener, event); - if (rc >= 0 && rc2 < 0) - rc = rc2; if (rc < 0) ERROR("error while unsubscribing event"); return rc; |