diff options
author | José Bollo <jose.bollo@iot.bzh> | 2018-08-21 21:15:31 +0200 |
---|---|---|
committer | José Bollo <jose.bollo@iot.bzh> | 2018-08-23 10:18:18 +0200 |
commit | 2991a2564bc5e21b04dcb3157ce38804080c0056 (patch) | |
tree | 529563afe088e7a9d91383e2cc256d97bd4c0022 | |
parent | 7b6940f1524cac6172e71529a989424ff18fb850 (diff) |
afb-stub-ws: Add reconnection of ws-clients on need
This implementation detects deconnections and try to reconnect
lazily on need.
Bug-AGL: SPEC-1668
Change-Id: Ib2a20a4578f2da80afe1564c42de96c4aa250e64
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r-- | src/afb-api-ws.c | 16 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 55 | ||||
-rw-r--r-- | src/afb-stub-ws.h | 2 |
3 files changed, 67 insertions, 6 deletions
diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 6d8506c1..8069da22 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -50,6 +50,12 @@ struct api_ws_server /*** C L I E N T ***/ /******************************************************************************/ +static struct fdev *reopen_client(void *closure) +{ + const char *uri = closure; + return afb_socket_open_fdev(uri, 0); +} + int afb_api_ws_add_client(const char *uri, struct afb_apiset *declare_set, struct afb_apiset *call_set, int strong) { struct afb_stub_ws *stubws; @@ -73,8 +79,16 @@ int afb_api_ws_add_client(const char *uri, struct afb_apiset *declare_set, struc ERROR("can't setup client ws service to %s", uri); fdev_unref(fdev); } else { - if (afb_stub_ws_client_add(stubws, declare_set) >= 0) + if (afb_stub_ws_client_add(stubws, declare_set) >= 0) { +#if 1 + /* it is asserted here that uri is never released */ + afb_stub_ws_client_robustify(stubws, reopen_client, (void*)uri, NULL); +#else + /* it is asserted here that uri is released, so use a copy */ + afb_stub_ws_client_robustify(stubws, reopen_client, strdup(uri), free); +#endif return 0; + } ERROR("can't add the client to the apiset for service %s", uri); afb_stub_ws_unref(stubws); } diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 19b9b618..a41a9b02 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -128,6 +128,13 @@ struct afb_stub_ws struct { /* event replica */ struct client_event *events; + + /* robustify */ + struct { + struct fdev *(*reopen)(void*); + void *closure; + void (*release)(void*); + } robust; }; }; @@ -141,6 +148,8 @@ struct afb_stub_ws char apiname[1]; }; +static struct afb_proto_ws *afb_stub_ws_create_proto(struct afb_stub_ws *stubws, struct fdev *fdev, uint8_t server); + /******************* ws request part for server *****************/ /* decrement the reference count of the request and free/release it on falling to null */ @@ -229,19 +238,35 @@ static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint return ev; } +static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws) +{ + struct fdev *fdev; + struct afb_proto_ws *proto; + + proto = stubws->proto; + if (proto == NULL && stubws->robust.reopen) { + fdev = stubws->robust.reopen(stubws->robust.closure); + if (fdev != NULL) + proto = afb_stub_ws_create_proto(stubws, fdev, 0); + } + return proto; +} + /* on call, propagate it to the ws service */ static void client_api_call_cb(void * closure, struct afb_xreq *xreq) { int rc; struct afb_stub_ws *stubws = closure; + struct afb_proto_ws *proto; - if (stubws->proto == NULL) { + proto = client_get_proto(stubws); + if (proto == NULL) { afb_xreq_reply(xreq, NULL, "disconnected", "server hung up"); return; } rc = afb_proto_ws_client_call( - stubws->proto, + proto, xreq->request.called_verb, afb_xreq_json(xreq), afb_session_uuid(xreq->context.session), @@ -264,12 +289,14 @@ static void client_on_description_cb(void *closure, struct json_object *data) static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop) { struct client_describe *desc = closure; + struct afb_proto_ws *proto; - if (signum || desc->stubws->proto == NULL) + proto = client_get_proto(desc->stubws); + if (signum || proto == NULL) jobs_leave(jobloop); else { desc->jobloop = jobloop; - afb_proto_ws_client_describe(desc->stubws->proto, client_on_description_cb, desc); + afb_proto_ws_client_describe(proto, client_on_description_cb, desc); } } @@ -685,6 +712,12 @@ void afb_stub_ws_unref(struct afb_stub_ws *stubws) { if (stubws && !__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) { + if (stubws->is_client) { + stubws->robust.reopen = NULL; + if (stubws->robust.release) + stubws->robust.release(stubws->robust.closure); + } + disconnect(stubws); afb_apiset_unref(stubws->apiset); free(stubws); @@ -713,7 +746,7 @@ struct afb_api_item afb_stub_ws_client_api(struct afb_stub_ws *stubws) assert(stubws->is_client); /* check client */ api.closure = stubws; api.itf = &client_api_itf; - api.group = NULL; + api.group = stubws; /* serialize for reconnections */ return api; } @@ -721,3 +754,15 @@ int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset { return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws)); } + +void afb_stub_ws_client_robustify(struct afb_stub_ws *stubws, struct fdev *(*reopen)(void*), void *closure, void (*release)(void*)) +{ + assert(stubws->is_client); /* check client */ + + if (stubws->robust.release) + stubws->robust.release(stubws->robust.closure); + + stubws->robust.reopen = reopen; + stubws->robust.closure = closure; + stubws->robust.release = release; +} diff --git a/src/afb-stub-ws.h b/src/afb-stub-ws.h index f236937d..c0877268 100644 --- a/src/afb-stub-ws.h +++ b/src/afb-stub-ws.h @@ -39,3 +39,5 @@ extern struct afb_api_item afb_stub_ws_client_api(struct afb_stub_ws *stubws); extern int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset); +extern void afb_stub_ws_client_robustify(struct afb_stub_ws *stubws, struct fdev *(*reopen)(void*), void *closure, void (*release)(void*)); + |