aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/afb-stub-ws.c183
1 files changed, 180 insertions, 3 deletions
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index c7b4b2ec..43837e58 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -50,9 +50,8 @@
#include "afb-evt.h"
#include "afb-xreq.h"
#include "verbose.h"
+#include "jobs.h"
-struct client_call;
-struct client_event;
struct afb_stub_ws;
/************** constants for protocol definition *************************/
@@ -68,6 +67,8 @@ struct afb_stub_ws;
#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
#define CHAR_FOR_SUBCALL_CALL 'B'
#define CHAR_FOR_SUBCALL_REPLY 'R'
+#define CHAR_FOR_DESCRIBE 'D'
+#define CHAR_FOR_DESCRIPTION 'd'
/******************* handling subcalls *****************************/
@@ -122,6 +123,27 @@ struct client_event
int refcount;
};
+/*
+ * structure for recording describe requests
+ */
+struct client_describe
+{
+ struct client_describe *next;
+ struct afb_stub_ws *stubws;
+ struct jobloop *jobloop;
+ struct json_object *result;
+ uint32_t descid;
+};
+
+/*
+ * structure for jobs of describing
+ */
+struct server_describe
+{
+ struct afb_stub_ws *stubws;
+ uint32_t descid;
+};
+
/******************* client description part for server *****************************/
struct afb_stub_ws
@@ -153,6 +175,9 @@ struct afb_stub_ws
/* pending subcalls (server side) */
struct server_subcall *subcalls;
+ /* pending description (client side) */
+ struct client_describe *describes;
+
/* apiset */
struct afb_apiset *apiset;
@@ -769,6 +794,31 @@ static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb)
}
}
+/* pushs an event */
+static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+ uint32_t descid;
+ struct client_describe *desc;
+ struct json_object *object;
+
+ if (!readbuf_uint32(rb, &descid))
+ ERROR("unreadable description");
+ else {
+ desc = stubws->describes;
+ while (desc && desc->descid != descid)
+ desc = desc->next;
+ if (desc == NULL)
+ ERROR("unexpected description");
+ else {
+ if (readbuf_object(rb, &object))
+ desc->result = object;
+ else
+ ERROR("bad description");
+ jobs_leave(desc->jobloop);
+ }
+ }
+}
+
/* callback when receiving binary data */
static void client_on_binary(void *closure, char *data, size_t size)
{
@@ -805,6 +855,9 @@ static void client_on_binary(void *closure, char *data, size_t size)
case CHAR_FOR_SUBCALL_CALL: /* subcall */
client_subcall(stubws, &rb);
break;
+ case CHAR_FOR_DESCRIPTION: /* description */
+ client_on_description(stubws, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -866,6 +919,70 @@ end:
pthread_mutex_unlock(&stubws->mutex);
}
+static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
+{
+ struct client_describe *desc = closure;
+ struct writebuf wb = { .count = 0 };
+
+ if (!signum) {
+ /* record the jobloop */
+ desc->jobloop = jobloop;
+
+ /* send */
+ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
+ && writebuf_uint32(&wb, desc->descid)
+ && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0)
+ return;
+ }
+ jobs_leave(jobloop);
+}
+
+/* get the description */
+static struct json_object *client_describe_cb(void * closure)
+{
+ struct client_describe desc, *d;
+ struct afb_stub_ws *stubws = closure;
+
+ /* fill in stack the description of the task */
+ pthread_mutex_lock(&stubws->mutex);
+ desc.result = NULL;
+ desc.descid = ptr2id(&desc);
+ d = stubws->describes;
+ while (d) {
+ if (d->descid != desc.descid)
+ d = d->next;
+ else {
+ desc.descid++;
+ d = stubws->describes;
+ }
+ }
+ desc.stubws = stubws;
+ desc.next = stubws->describes;
+ stubws->describes = &desc;
+ pthread_mutex_unlock(&stubws->mutex);
+
+ /* synchronous job: send the request and wait its result */
+ jobs_enter(NULL, 0, client_send_describe_cb, &desc);
+
+ /* unlink and send the result */
+ pthread_mutex_lock(&stubws->mutex);
+ d = stubws->describes;
+ if (d == &desc)
+ stubws->describes = desc.next;
+ else {
+ while (d) {
+ if (d->next != &desc)
+ d = d->next;
+ else {
+ d->next = desc.next;
+ d = NULL;
+ }
+ }
+ }
+ pthread_mutex_unlock(&stubws->mutex);
+ return desc.result;
+}
+
/******************* client description part for server *****************************/
/* on call, propagate it to the ws service */
@@ -953,6 +1070,62 @@ static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *
json_object_put(object);
}
+static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj)
+{
+ struct writebuf wb = { .count = 0 };
+
+ if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+ || !writebuf_uint32(&wb, descid)
+ || !writebuf_object(&wb, descobj)
+ || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0)
+ ERROR("can't send description");
+}
+
+static void server_describe_job(int signum, void *closure)
+{
+ struct afb_api api;
+ struct json_object *obj;
+ struct server_describe *desc = closure;
+
+ /* get the description if possible */
+ obj = NULL;
+ if (!signum
+ && !afb_apiset_get(desc->stubws->apiset, desc->stubws->apiname, &api)
+ && api.itf->describe) {
+ obj = api.itf->describe(api.closure);
+ }
+
+ /* send it */
+ server_send_description(desc->stubws, desc->descid, obj);
+ json_object_put(obj);
+ afb_stub_ws_unref(desc->stubws);
+ free(desc);
+}
+
+/* on describe, propagate it to the ws service */
+static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+
+ uint32_t descid;
+ struct server_describe *desc;
+
+ /* reads the descid */
+ if (readbuf_uint32(rb, &descid)) {
+ /* create asynchronous job */
+ desc = malloc(sizeof *desc);
+ if (desc) {
+ desc->descid = descid;
+ desc->stubws = stubws;
+ afb_stub_ws_addref(stubws);
+ if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
+ server_describe_job(0, desc);
+ return;
+ }
+ server_send_description(stubws, descid, NULL);
+ }
+ ERROR("can't provide description");
+}
+
/* callback when receiving binary data */
static void server_on_binary(void *closure, char *data, size_t size)
{
@@ -965,6 +1138,9 @@ static void server_on_binary(void *closure, char *data, size_t size)
case CHAR_FOR_SUBCALL_REPLY:
server_on_subcall_reply(closure, &rb);
break;
+ case CHAR_FOR_DESCRIBE:
+ server_on_describe(closure, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -1070,7 +1246,8 @@ static const struct afb_ws_itf server_ws_itf =
};
static struct afb_api_itf ws_api_itf = {
- .call = client_call_cb
+ .call = client_call_cb,
+ .describe = client_describe_cb
};
/*****************************************************/