diff options
-rw-r--r-- | src/afb-evt.c | 30 | ||||
-rw-r--r-- | src/afb-evt.h | 3 | ||||
-rw-r--r-- | src/afb-proto-ws.c | 19 | ||||
-rw-r--r-- | src/afb-proto-ws.h | 2 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 14 |
5 files changed, 65 insertions, 3 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c index c0e43200..1c8798fd 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -894,6 +894,36 @@ int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid return -1; } +/* + * Avoids the 'listener' to watch 'eventid' + * Returns 0 in case of success or else -1. + */ +int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid) +{ + struct afb_evt_watch *watch; + struct afb_evtid *evtid; + + /* search the existing watch */ + pthread_rwlock_wrlock(&listener->rwlock); + watch = listener->watchs; + while(watch != NULL) { + evtid = watch->evtid; + if (evtid->id == eventid) { + if (watch->activity != 0) { + watch->activity--; + if (watch->activity == 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evtid->fullname, evtid->id); + } + pthread_rwlock_unlock(&listener->rwlock); + return 0; + } + watch = watch->next_by_listener; + } + pthread_rwlock_unlock(&listener->rwlock); + errno = ENOENT; + return -1; +} + #if WITH_AFB_HOOK /* * update the hooks for events diff --git a/src/afb-evt.h b/src/afb-evt.h index 6e0297f6..3392ee3d 100644 --- a/src/afb-evt.h +++ b/src/afb-evt.h @@ -60,6 +60,7 @@ extern int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object * extern int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid); extern int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid); +extern int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid); extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname); @@ -89,4 +90,4 @@ extern const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid); extern int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj); extern int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object); extern void afb_evt_update_hooks(); -#endif
\ No newline at end of file +#endif diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index f19924fa..10c06fcd 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -67,6 +67,8 @@ For the purpose of handling events the server can: - push or broadcast data as an event + - signal unexpected event + */ /************** constants for protocol definition *************************/ @@ -78,6 +80,7 @@ For the purpose of handling events the server can: #define CHAR_FOR_EVT_PUSH 'P' /* server -> client */ #define CHAR_FOR_EVT_SUBSCRIBE 'X' /* server -> client */ #define CHAR_FOR_EVT_UNSUBSCRIBE 'x' /* server -> client */ +#define CHAR_FOR_EVT_UNEXPECTED 'U' /* client -> server */ #define CHAR_FOR_DESCRIBE 'D' /* client -> server */ #define CHAR_FOR_DESCRIPTION 'd' /* server -> client */ #define CHAR_FOR_TOKEN_ADD 'T' /* client -> server */ @@ -790,6 +793,11 @@ int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t toke return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL); } +int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_EVT_UNEXPECTED, eventid, NULL); +} + int afb_proto_ws_client_call( struct afb_proto_ws *protows, const char *verb, @@ -1039,6 +1047,14 @@ static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *r protows->server_itf->on_token_remove(protows->closure, tokenid); } +static void server_on_event_unexpected(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t eventid; + + if (readbuf_uint16(rb, &eventid)) + protows->server_itf->on_event_unexpected(protows->closure, eventid); +} + /* on version offer */ static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb) { @@ -1097,6 +1113,9 @@ static void server_on_binary_job(int sig, void *closure) case CHAR_FOR_TOKEN_DROP: server_on_token_drop(binary->protows, &binary->rb); break; + case CHAR_FOR_EVT_UNEXPECTED: + server_on_event_unexpected(binary->protows, &binary->rb); + break; case CHAR_FOR_VERSION_OFFER: server_on_version_offer(binary->protows, &binary->rb); break; diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h index 2dcb142b..204bdb83 100644 --- a/src/afb-proto-ws.h +++ b/src/afb-proto-ws.h @@ -57,6 +57,7 @@ struct afb_proto_ws_server_itf void (*on_token_remove)(void *closure, uint16_t tokenid); void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds); void (*on_describe)(void *closure, struct afb_proto_ws_describe *describe); + void (*on_event_unexpected)(void *closure, uint16_t eventid); }; extern struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure); @@ -80,6 +81,7 @@ extern int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16 extern int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid); extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, void *request, const char *user_creds); extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure); +extern int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid); extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name); extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id); diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index b3b2e58a..b7e3c946 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -414,9 +414,11 @@ static void client_on_event_push_cb(void *closure, uint16_t event_id, struct jso rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event); if (rc >= 0 && event) - afb_evt_event_x2_push(event, data); + rc = afb_evt_event_x2_push(event, data); else ERROR("unreadable push event"); + if (rc <= 0) + afb_proto_ws_client_event_unexpected(stubws->proto, event_id); } static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data, const uuid_binary_t uuid, uint8_t hop) @@ -493,6 +495,13 @@ static void server_on_token_remove_cb(void *closure, uint16_t tokenid) afb_token_unref(token); } +static void server_on_event_unexpected_cb(void *closure, uint16_t eventid) +{ + struct afb_stub_ws *stubws = closure; + + afb_evt_watch_sub_eventid(stubws->listener, eventid); +} + static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds) { struct afb_stub_ws *stubws = closure; @@ -583,7 +592,8 @@ static const struct afb_proto_ws_server_itf server_itf = .on_token_create = server_on_token_create_cb, .on_token_remove = server_on_token_remove_cb, .on_call = server_on_call_cb, - .on_describe = server_on_describe_cb + .on_describe = server_on_describe_cb, + .on_event_unexpected = server_on_event_unexpected_cb }; /* the interface for events pushing */ |