diff options
author | José Bollo <jose.bollo@iot.bzh> | 2017-04-13 16:05:30 +0200 |
---|---|---|
committer | José Bollo <jose.bollo@iot.bzh> | 2017-04-13 23:02:25 +0200 |
commit | 9991f9f55b6b77bf89a9e2cec84280d0c9c0b2cd (patch) | |
tree | 208d68b50737dbce695989bcab3011df24f9ba7f | |
parent | 062887f854dba260a2fc12bd4c388baea65f524a (diff) |
Implement subcall for services over websockets
This modification make subcalls pushed back to
the client that will issue it for itself. This
will at the end ensure the security context of
the client.
Change-Id: Ib4bb5125ffe1b942103b72d1a3d13892dda87baa
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r-- | src/afb-api-ws.c | 314 | ||||
-rwxr-xr-x | stress-server.sh | 18 |
2 files changed, 273 insertions, 59 deletions
diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 9b1d8cc7..ea0781df 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -29,6 +29,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> +#include <pthread.h> #include <json-c/json.h> #include <systemd/sd-event.h> @@ -62,6 +63,8 @@ struct api_ws_client; #define CHAR_FOR_EVT_PUSH '!' #define CHAR_FOR_EVT_SUBSCRIBE 'S' #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_SUBCALL_CALL 'B' +#define CHAR_FOR_SUBCALL_REPLY 'R' /* */ @@ -70,6 +73,7 @@ struct api_ws char *path; /* path of the object for the API */ char *api; /* api name of the interface */ int fd; /* file descriptor */ + pthread_mutex_t mutex; /**< resource control */ union { struct { uint32_t id; @@ -78,8 +82,7 @@ struct api_ws struct api_ws_memo *memos; } client; struct { - sd_event_source *listensrc; - struct afb_evt_listener *listener; /* listener for broadcasted events */ + sd_event_source *listensrc; /**< systemd source for server socket */ } server; }; }; @@ -116,6 +119,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = { .remove = api_ws_server_event_remove }; +/******************* handling subcalls *****************************/ + +/** + * Structure on server side for recording pending + * subcalls. + */ +struct api_ws_subcall +{ + struct api_ws_subcall *next; /**< next subcall for the client */ + uint32_t subcallid; /**< the subcallid */ + void (*callback)(void*, int, struct json_object*); /**< callback on completion */ + void *closure; /**< closure of the callback */ +}; + +/** + * Structure for sending back replies on client side + */ +struct api_ws_reply +{ + struct api_ws *apiws; /**< api descriptor */ + uint32_t subcallid; /**< subcallid for the reply */ +}; + /******************* client description part for server *****************************/ struct api_ws_client @@ -126,17 +152,23 @@ struct api_ws_client /* count of references */ int refcount; - /* listener for events */ - struct afb_evt_listener *listener; - /* file descriptor */ int fd; + /* resource control */ + pthread_mutex_t mutex; + + /* listener for events */ + struct afb_evt_listener *listener; + /* websocket */ struct afb_ws *ws; /* credentials */ struct afb_cred *cred; + + /* pending subcalls */ + struct api_ws_subcall *subcalls; }; /******************* websocket interface for client part **********************************/ @@ -161,8 +193,6 @@ static const struct afb_ws_itf api_ws_server_ws_itf = struct api_ws_server_req { struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ - const char *request; /* the readen request as string */ - size_t lenreq; /* the length of the request */ uint32_t msgid; /* the incoming request msgid */ }; @@ -171,11 +201,13 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq); static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event); static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { .success = api_ws_server_req_success_cb, .fail = api_ws_server_req_fail_cb, .unref = api_ws_server_req_destroy_cb, + .subcall = api_ws_server_req_subcall_cb, .subscribe = api_ws_server_req_subscribe_cb, .unsubscribe = api_ws_server_req_unsubscribe_cb }; @@ -210,6 +242,7 @@ static struct api_ws *api_ws_make(const char *path) errno = EINVAL; goto error2; } + pthread_mutex_init(&api->mutex, NULL); api->fd = -1; return api; @@ -358,6 +391,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length) return before; } +static int api_ws_read_char(struct readbuf *rb, char *value) +{ + if (rb->head >= rb->end) + return 0; + *value = *rb->head++; + return 1; +} + static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value) { char *after = rb->head + sizeof *value; @@ -381,9 +422,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le static int api_ws_read_object(struct readbuf *rb, struct json_object **object) { - size_t length; const char *string; - return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0); + struct json_object *o; + int rc = api_ws_read_string(rb, &string, NULL); + if (rc) { + o = json_tokener_parse(string); + if (o == NULL && strcmp(string, "null")) + o = json_object_new_string(string); + *object = o; + } + return rc; } static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length) @@ -438,22 +486,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) return string != NULL && api_ws_write_string(wb, string); } - - - /******************* client part **********************************/ /* * structure for recording query data */ struct api_ws_memo { - struct api_ws_memo *next; /* the next memo */ + struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ struct afb_xreq *xreq; /* the request handle */ -#if 0 - struct afb_req req; /* the request handle */ - struct afb_context *context; /* the context of the query */ -#endif uint32_t msgid; /* the message identifier */ }; @@ -575,7 +616,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st } /* read a subscrition message */ -static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo) +static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev) { return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev); } @@ -649,7 +690,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* subscribe the request from the event */ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); @@ -662,7 +703,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* unsubscribe the request from the event */ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); @@ -738,39 +779,102 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) api_ws_client_memo_destroy(memo); } +/* send a subcall reply */ +static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object) +{ + int rc; + struct writebuf wb = { .count = 0 }; + char ie = (char)!!iserror; + + if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY) + || !api_ws_write_uint32(&wb, reply->subcallid) + || !api_ws_write_char(&wb, ie) + || !api_ws_write_object(&wb, object)) { + /* write error ? */ + return; + } + + rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + return; + ERROR("error while sending subcall reply"); +} + +/* callback for subcall reply */ +static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object) +{ + api_ws_client_send_subcall_reply(closure, iserror, object); + free(closure); +} + +/* received a subcall request */ +static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb) +{ + struct api_ws_reply *reply; + struct api_ws_memo *memo; + const char *api, *verb; + uint32_t subcallid; + struct json_object *object; + + reply = malloc(sizeof *reply); + if (!reply) + return; + + /* retrieve the message data */ + if (!api_ws_client_msg_memo_get(apiws, rb, &memo)) + return; + + if (api_ws_read_uint32(rb, &subcallid) + && api_ws_read_string(rb, &api, NULL) + && api_ws_read_string(rb, &verb, NULL) + && api_ws_read_object(rb, &object)) { + reply->apiws = apiws; + reply->subcallid = subcallid; + afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply); + } +} + /* callback when receiving binary data */ static void api_ws_client_on_binary(void *closure, char *data, size_t size) { if (size > 0) { + struct api_ws *apiws = closure; struct readbuf rb = { .head = data, .end = data + size }; + + pthread_mutex_lock(&apiws->mutex); switch (*rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - api_ws_client_reply_success(closure, &rb); + api_ws_client_reply_success(apiws, &rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - api_ws_client_reply_fail(closure, &rb); + api_ws_client_reply_fail(apiws, &rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - api_ws_client_event_broadcast(closure, &rb); + api_ws_client_event_broadcast(apiws, &rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - api_ws_client_event_create(closure, &rb); + api_ws_client_event_create(apiws, &rb); break; case CHAR_FOR_EVT_DEL: /* drops the event */ - api_ws_client_event_drop(closure, &rb); + api_ws_client_event_drop(apiws, &rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - api_ws_client_event_push(closure, &rb); + api_ws_client_event_push(apiws, &rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - api_ws_client_event_subscribe(closure, &rb); + api_ws_client_event_subscribe(apiws, &rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - api_ws_client_event_unsubscribe(closure, &rb); + api_ws_client_event_unsubscribe(apiws, &rb); + break; + case CHAR_FOR_SUBCALL_CALL: /* subcall */ + api_ws_client_subcall(apiws, &rb); break; default: /* unexpected message */ + /* TODO: close the connection */ break; } + pthread_mutex_unlock(&apiws->mutex); } free(data); } @@ -783,13 +887,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) struct writebuf wb = { .count = 0 }; const char *raw; size_t szraw; - struct api_ws *api = closure; + struct api_ws *apiws = closure; + + pthread_mutex_lock(&apiws->mutex); /* create the recording data */ - memo = api_ws_client_memo_make(api, xreq); + memo = api_ws_client_memo_make(apiws, xreq); if (memo == NULL) { afb_xreq_fail_f(xreq, "error", "out of memory"); - return; + goto end; } /* creates the call message */ @@ -805,12 +911,10 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) goto overflow; /* send */ - rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count); - if (rc < 0) - goto ws_send_error; - return; + rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + goto end; -ws_send_error: afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; @@ -823,6 +927,8 @@ overflow: clean_memo: api_ws_client_memo_destroy(memo); +end: + pthread_mutex_unlock(&apiws->mutex); } static int api_ws_service_start_cb(void *closure, int share_session, int onneed) @@ -912,63 +1018,114 @@ error: static void api_ws_server_client_unref(struct api_ws_client *client) { - if (!--client->refcount) { + struct api_ws_subcall *sc, *nsc; + + if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(client->listener); afb_ws_destroy(client->ws); + nsc = client->subcalls; + while (nsc) { + sc= nsc; + nsc = sc->next; + sc->callback(sc->closure, 1, NULL); + free(sc); + } afb_cred_unref(client->cred); free(client); } } +static void api_ws_server_client_addref(struct api_ws_client *client) +{ + __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED); +} + /* on call, propagate it to the ws service */ static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb) { struct api_ws_server_req *wreq; + char *cverb; const char *uuid, *verb; - uint32_t flags; - - client->refcount++; - - /* create the request */ - wreq = calloc(1 , sizeof *wreq); - if (wreq == NULL) - goto out_of_memory; + uint32_t flags, msgid; + size_t lenverb; + struct json_object *object; - wreq->client = client; + api_ws_server_client_addref(client); /* reads the call message data */ - if (!api_ws_read_uint32(rb, &wreq->msgid) + if (!api_ws_read_uint32(rb, &msgid) || !api_ws_read_uint32(rb, &flags) - || !api_ws_read_string(rb, &verb, NULL) + || !api_ws_read_string(rb, &verb, &lenverb) || !api_ws_read_string(rb, &uuid, NULL) - || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) + || !api_ws_read_object(rb, &object)) goto overflow; + /* create the request */ + wreq = malloc(++lenverb + sizeof *wreq); + if (wreq == NULL) + goto out_of_memory; + afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf); - wreq->xreq.json = json_tokener_parse(wreq->request); - if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) { - wreq->xreq.json = json_object_new_string(wreq->request); - } + wreq->client = client; + wreq->msgid = msgid; + cverb = (char*)&wreq[1]; + memcpy(cverb, verb, lenverb); /* init the context */ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) - goto out_of_memory; + goto unconnected; wreq->xreq.context.flags = flags; /* makes the call */ wreq->xreq.cred = afb_cred_addref(client->cred); wreq->xreq.api = client->api; - wreq->xreq.verb = verb; + wreq->xreq.verb = cverb; + wreq->xreq.json = object; afb_apis_call(&wreq->xreq); afb_xreq_unref(&wreq->xreq); return; +unconnected: + free(wreq); out_of_memory: + json_object_put(object); overflow: - free(wreq); api_ws_server_client_unref(client); } +/* on subcall reply */ +static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb) +{ + char iserror; + uint32_t subcallid; + struct json_object *object; + struct api_ws_subcall *sc, **psc; + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &subcallid) + || !api_ws_read_char(rb, &iserror) + || !api_ws_read_object(rb, &object)) { + /* TODO bad protocol */ + return; + } + + /* search the subcall and unlink it */ + pthread_mutex_lock(&client->mutex); + psc = &client->subcalls; + while ((sc = *psc) && sc->subcallid != subcallid) + psc = &sc->next; + if (!sc) { + pthread_mutex_unlock(&client->mutex); + /* TODO subcall not found */ + } else { + *psc = sc->next; + pthread_mutex_unlock(&client->mutex); + sc->callback(sc->closure, (int)iserror, object); + free(sc); + } + json_object_put(object); +} + /* callback when receiving binary data */ static void api_ws_server_on_binary(void *closure, char *data, size_t size) { @@ -978,6 +1135,9 @@ static void api_ws_server_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_CALL: api_ws_server_on_call(closure, &rb); break; + case CHAR_FOR_SUBCALL_REPLY: + api_ws_server_on_subcall_reply(closure, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1021,6 +1181,7 @@ static void api_ws_server_accept(struct api_ws *api) if (client->ws != NULL) { client->api = api->api; client->refcount = 1; + client->subcalls = NULL; return; } afb_cred_unref(client->cred); @@ -1135,6 +1296,47 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, ERROR("error while sending fail"); } +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) +{ + int rc; + struct writebuf wb = { .count = 0 }; + struct api_ws_subcall *sc, *osc; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); + struct api_ws_client *client = wreq->client; + + sc = malloc(sizeof *sc); + if (!sc) { + + } else { + sc->callback = callback; + sc->closure = cb_closure; + + pthread_mutex_unlock(&client->mutex); + sc->subcallid = (uint32_t)(((intptr_t)sc) >> 6); + do { + sc->subcallid++; + osc = client->subcalls; + while(osc && osc->subcallid != sc->subcallid) + osc = osc->next; + } while (osc); + sc->next = client->subcalls; + client->subcalls = sc; + pthread_mutex_unlock(&client->mutex); + + if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL) + && api_ws_write_uint32(&wb, wreq->msgid) + && api_ws_write_uint32(&wb, sc->subcallid) + && api_ws_write_string(&wb, api) + && api_ws_write_string(&wb, verb) + && api_ws_write_object(&wb, args)) { + rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); + if (rc >= 0) + return; + } + ERROR("error while sending fail"); + } +} + static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; diff --git a/stress-server.sh b/stress-server.sh index a0d83fd9..31fce9f6 100755 --- a/stress-server.sh +++ b/stress-server.sh @@ -3,16 +3,28 @@ ROOT=$(dirname $0) echo ROOT=$ROOT -AFB=$ROOT/build/src/afb-daemon +cd $ROOT + +AFB=build/src/afb-daemon HELLO=build/bindings/samples/helloWorld.so PORT=12345 TEST=test TOKEN=knock-knock-knoc -OUT=$ROOT/stress-out-server +OUT=stress-out-server rm $OUT* -ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO" +case "$1" in + --ws) + shift + ARGS="-q --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello " +# ARGS="-vv --tracereq=all --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello " + ;; + *) + ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO" + ;; +esac + echo -n launch afb... case "$1" in |