diff options
-rw-r--r-- | src/afb-stub-ws.c | 183 |
1 files changed, 180 insertions, 3 deletions
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index c7b4b2ec..43837e58 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -50,9 +50,8 @@ #include "afb-evt.h" #include "afb-xreq.h" #include "verbose.h" +#include "jobs.h" -struct client_call; -struct client_event; struct afb_stub_ws; /************** constants for protocol definition *************************/ @@ -68,6 +67,8 @@ struct afb_stub_ws; #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' #define CHAR_FOR_SUBCALL_CALL 'B' #define CHAR_FOR_SUBCALL_REPLY 'R' +#define CHAR_FOR_DESCRIBE 'D' +#define CHAR_FOR_DESCRIPTION 'd' /******************* handling subcalls *****************************/ @@ -122,6 +123,27 @@ struct client_event int refcount; }; +/* + * structure for recording describe requests + */ +struct client_describe +{ + struct client_describe *next; + struct afb_stub_ws *stubws; + struct jobloop *jobloop; + struct json_object *result; + uint32_t descid; +}; + +/* + * structure for jobs of describing + */ +struct server_describe +{ + struct afb_stub_ws *stubws; + uint32_t descid; +}; + /******************* client description part for server *****************************/ struct afb_stub_ws @@ -153,6 +175,9 @@ struct afb_stub_ws /* pending subcalls (server side) */ struct server_subcall *subcalls; + /* pending description (client side) */ + struct client_describe *describes; + /* apiset */ struct afb_apiset *apiset; @@ -769,6 +794,31 @@ static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb) } } +/* pushs an event */ +static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb) +{ + uint32_t descid; + struct client_describe *desc; + struct json_object *object; + + if (!readbuf_uint32(rb, &descid)) + ERROR("unreadable description"); + else { + desc = stubws->describes; + while (desc && desc->descid != descid) + desc = desc->next; + if (desc == NULL) + ERROR("unexpected description"); + else { + if (readbuf_object(rb, &object)) + desc->result = object; + else + ERROR("bad description"); + jobs_leave(desc->jobloop); + } + } +} + /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { @@ -805,6 +855,9 @@ static void client_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_SUBCALL_CALL: /* subcall */ client_subcall(stubws, &rb); break; + case CHAR_FOR_DESCRIPTION: /* description */ + client_on_description(stubws, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -866,6 +919,70 @@ end: pthread_mutex_unlock(&stubws->mutex); } +static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop) +{ + struct client_describe *desc = closure; + struct writebuf wb = { .count = 0 }; + + if (!signum) { + /* record the jobloop */ + desc->jobloop = jobloop; + + /* send */ + if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) + && writebuf_uint32(&wb, desc->descid) + && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0) + return; + } + jobs_leave(jobloop); +} + +/* get the description */ +static struct json_object *client_describe_cb(void * closure) +{ + struct client_describe desc, *d; + struct afb_stub_ws *stubws = closure; + + /* fill in stack the description of the task */ + pthread_mutex_lock(&stubws->mutex); + desc.result = NULL; + desc.descid = ptr2id(&desc); + d = stubws->describes; + while (d) { + if (d->descid != desc.descid) + d = d->next; + else { + desc.descid++; + d = stubws->describes; + } + } + desc.stubws = stubws; + desc.next = stubws->describes; + stubws->describes = &desc; + pthread_mutex_unlock(&stubws->mutex); + + /* synchronous job: send the request and wait its result */ + jobs_enter(NULL, 0, client_send_describe_cb, &desc); + + /* unlink and send the result */ + pthread_mutex_lock(&stubws->mutex); + d = stubws->describes; + if (d == &desc) + stubws->describes = desc.next; + else { + while (d) { + if (d->next != &desc) + d = d->next; + else { + d->next = desc.next; + d = NULL; + } + } + } + pthread_mutex_unlock(&stubws->mutex); + return desc.result; +} + /******************* client description part for server *****************************/ /* on call, propagate it to the ws service */ @@ -953,6 +1070,62 @@ static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf * json_object_put(object); } +static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj) +{ + struct writebuf wb = { .count = 0 }; + + if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION) + || !writebuf_uint32(&wb, descid) + || !writebuf_object(&wb, descobj) + || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0) + ERROR("can't send description"); +} + +static void server_describe_job(int signum, void *closure) +{ + struct afb_api api; + struct json_object *obj; + struct server_describe *desc = closure; + + /* get the description if possible */ + obj = NULL; + if (!signum + && !afb_apiset_get(desc->stubws->apiset, desc->stubws->apiname, &api) + && api.itf->describe) { + obj = api.itf->describe(api.closure); + } + + /* send it */ + server_send_description(desc->stubws, desc->descid, obj); + json_object_put(obj); + afb_stub_ws_unref(desc->stubws); + free(desc); +} + +/* on describe, propagate it to the ws service */ +static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb) +{ + + uint32_t descid; + struct server_describe *desc; + + /* reads the descid */ + if (readbuf_uint32(rb, &descid)) { + /* create asynchronous job */ + desc = malloc(sizeof *desc); + if (desc) { + desc->descid = descid; + desc->stubws = stubws; + afb_stub_ws_addref(stubws); + if (jobs_queue(NULL, 0, server_describe_job, desc) < 0) + server_describe_job(0, desc); + return; + } + server_send_description(stubws, descid, NULL); + } + ERROR("can't provide description"); +} + /* callback when receiving binary data */ static void server_on_binary(void *closure, char *data, size_t size) { @@ -965,6 +1138,9 @@ static void server_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_SUBCALL_REPLY: server_on_subcall_reply(closure, &rb); break; + case CHAR_FOR_DESCRIBE: + server_on_describe(closure, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1070,7 +1246,8 @@ static const struct afb_ws_itf server_ws_itf = }; static struct afb_api_itf ws_api_itf = { - .call = client_call_cb + .call = client_call_cb, + .describe = client_describe_cb }; /*****************************************************/ |