aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/afb-evt.c30
-rw-r--r--src/afb-evt.h3
-rw-r--r--src/afb-proto-ws.c19
-rw-r--r--src/afb-proto-ws.h2
-rw-r--r--src/afb-stub-ws.c14
5 files changed, 65 insertions, 3 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c
index c0e43200..1c8798fd 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -894,6 +894,36 @@ int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid
return -1;
}
+/*
+ * Avoids the 'listener' to watch 'eventid'
+ * Returns 0 in case of success or else -1.
+ */
+int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid)
+{
+ struct afb_evt_watch *watch;
+ struct afb_evtid *evtid;
+
+ /* search the existing watch */
+ pthread_rwlock_wrlock(&listener->rwlock);
+ watch = listener->watchs;
+ while(watch != NULL) {
+ evtid = watch->evtid;
+ if (evtid->id == eventid) {
+ if (watch->activity != 0) {
+ watch->activity--;
+ if (watch->activity == 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+ return 0;
+ }
+ watch = watch->next_by_listener;
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+}
+
#if WITH_AFB_HOOK
/*
* update the hooks for events
diff --git a/src/afb-evt.h b/src/afb-evt.h
index 6e0297f6..3392ee3d 100644
--- a/src/afb-evt.h
+++ b/src/afb-evt.h
@@ -60,6 +60,7 @@ extern int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *
extern int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid);
+extern int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid);
extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname);
@@ -89,4 +90,4 @@ extern const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid);
extern int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj);
extern int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object);
extern void afb_evt_update_hooks();
-#endif \ No newline at end of file
+#endif
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index f19924fa..10c06fcd 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -67,6 +67,8 @@ For the purpose of handling events the server can:
- push or broadcast data as an event
+ - signal unexpected event
+
*/
/************** constants for protocol definition *************************/
@@ -78,6 +80,7 @@ For the purpose of handling events the server can:
#define CHAR_FOR_EVT_PUSH 'P' /* server -> client */
#define CHAR_FOR_EVT_SUBSCRIBE 'X' /* server -> client */
#define CHAR_FOR_EVT_UNSUBSCRIBE 'x' /* server -> client */
+#define CHAR_FOR_EVT_UNEXPECTED 'U' /* client -> server */
#define CHAR_FOR_DESCRIBE 'D' /* client -> server */
#define CHAR_FOR_DESCRIPTION 'd' /* server -> client */
#define CHAR_FOR_TOKEN_ADD 'T' /* client -> server */
@@ -790,6 +793,11 @@ int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t toke
return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL);
}
+int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid)
+{
+ return client_send_cmd_id16_optstr(protows, CHAR_FOR_EVT_UNEXPECTED, eventid, NULL);
+}
+
int afb_proto_ws_client_call(
struct afb_proto_ws *protows,
const char *verb,
@@ -1039,6 +1047,14 @@ static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *r
protows->server_itf->on_token_remove(protows->closure, tokenid);
}
+static void server_on_event_unexpected(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t eventid;
+
+ if (readbuf_uint16(rb, &eventid))
+ protows->server_itf->on_event_unexpected(protows->closure, eventid);
+}
+
/* on version offer */
static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb)
{
@@ -1097,6 +1113,9 @@ static void server_on_binary_job(int sig, void *closure)
case CHAR_FOR_TOKEN_DROP:
server_on_token_drop(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_EVT_UNEXPECTED:
+ server_on_event_unexpected(binary->protows, &binary->rb);
+ break;
case CHAR_FOR_VERSION_OFFER:
server_on_version_offer(binary->protows, &binary->rb);
break;
diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h
index 2dcb142b..204bdb83 100644
--- a/src/afb-proto-ws.h
+++ b/src/afb-proto-ws.h
@@ -57,6 +57,7 @@ struct afb_proto_ws_server_itf
void (*on_token_remove)(void *closure, uint16_t tokenid);
void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds);
void (*on_describe)(void *closure, struct afb_proto_ws_describe *describe);
+ void (*on_event_unexpected)(void *closure, uint16_t eventid);
};
extern struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure);
@@ -80,6 +81,7 @@ extern int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16
extern int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid);
extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, void *request, const char *user_creds);
extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure);
+extern int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid);
extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name);
extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index b3b2e58a..b7e3c946 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -414,9 +414,11 @@ static void client_on_event_push_cb(void *closure, uint16_t event_id, struct jso
rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event);
if (rc >= 0 && event)
- afb_evt_event_x2_push(event, data);
+ rc = afb_evt_event_x2_push(event, data);
else
ERROR("unreadable push event");
+ if (rc <= 0)
+ afb_proto_ws_client_event_unexpected(stubws->proto, event_id);
}
static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data, const uuid_binary_t uuid, uint8_t hop)
@@ -493,6 +495,13 @@ static void server_on_token_remove_cb(void *closure, uint16_t tokenid)
afb_token_unref(token);
}
+static void server_on_event_unexpected_cb(void *closure, uint16_t eventid)
+{
+ struct afb_stub_ws *stubws = closure;
+
+ afb_evt_watch_sub_eventid(stubws->listener, eventid);
+}
+
static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds)
{
struct afb_stub_ws *stubws = closure;
@@ -583,7 +592,8 @@ static const struct afb_proto_ws_server_itf server_itf =
.on_token_create = server_on_token_create_cb,
.on_token_remove = server_on_token_remove_cb,
.on_call = server_on_call_cb,
- .on_describe = server_on_describe_cb
+ .on_describe = server_on_describe_cb,
+ .on_event_unexpected = server_on_event_unexpected_cb
};
/* the interface for events pushing */