diff options
-rw-r--r-- | src/afb-api-ws.c | 314 | ||||
-rwxr-xr-x | stress-server.sh | 18 |
2 files changed, 273 insertions, 59 deletions
diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 9b1d8cc7..ea0781df 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -29,6 +29,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> +#include <pthread.h> #include <json-c/json.h> #include <systemd/sd-event.h> @@ -62,6 +63,8 @@ struct api_ws_client; #define CHAR_FOR_EVT_PUSH '!' #define CHAR_FOR_EVT_SUBSCRIBE 'S' #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_SUBCALL_CALL 'B' +#define CHAR_FOR_SUBCALL_REPLY 'R' /* */ @@ -70,6 +73,7 @@ struct api_ws char *path; /* path of the object for the API */ char *api; /* api name of the interface */ int fd; /* file descriptor */ + pthread_mutex_t mutex; /**< resource control */ union { struct { uint32_t id; @@ -78,8 +82,7 @@ struct api_ws struct api_ws_memo *memos; } client; struct { - sd_event_source *listensrc; - struct afb_evt_listener *listener; /* listener for broadcasted events */ + sd_event_source *listensrc; /**< systemd source for server socket */ } server; }; }; @@ -116,6 +119,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = { .remove = api_ws_server_event_remove }; +/******************* handling subcalls *****************************/ + +/** + * Structure on server side for recording pending + * subcalls. + */ +struct api_ws_subcall +{ + struct api_ws_subcall *next; /**< next subcall for the client */ + uint32_t subcallid; /**< the subcallid */ + void (*callback)(void*, int, struct json_object*); /**< callback on completion */ + void *closure; /**< closure of the callback */ +}; + +/** + * Structure for sending back replies on client side + */ +struct api_ws_reply +{ + struct api_ws *apiws; /**< api descriptor */ + uint32_t subcallid; /**< subcallid for the reply */ +}; + /******************* client description part for server *****************************/ struct api_ws_client @@ -126,17 +152,23 @@ struct api_ws_client /* count of references */ int refcount; - /* listener for events */ - struct afb_evt_listener *listener; - /* file descriptor */ int fd; + /* resource control */ + pthread_mutex_t mutex; + + /* listener for events */ + struct afb_evt_listener *listener; + /* websocket */ struct afb_ws *ws; /* credentials */ struct afb_cred *cred; + + /* pending subcalls */ + struct api_ws_subcall *subcalls; }; /******************* websocket interface for client part **********************************/ @@ -161,8 +193,6 @@ static const struct afb_ws_itf api_ws_server_ws_itf = struct api_ws_server_req { struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ - const char *request; /* the readen request as string */ - size_t lenreq; /* the length of the request */ uint32_t msgid; /* the incoming request msgid */ }; @@ -171,11 +201,13 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq); static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event); static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { .success = api_ws_server_req_success_cb, .fail = api_ws_server_req_fail_cb, .unref = api_ws_server_req_destroy_cb, + .subcall = api_ws_server_req_subcall_cb, .subscribe = api_ws_server_req_subscribe_cb, .unsubscribe = api_ws_server_req_unsubscribe_cb }; @@ -210,6 +242,7 @@ static struct api_ws *api_ws_make(const char *path) errno = EINVAL; goto error2; } + pthread_mutex_init(&api->mutex, NULL); api->fd = -1; return api; @@ -358,6 +391,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length) return before; } +static int api_ws_read_char(struct readbuf *rb, char *value) +{ + if (rb->head >= rb->end) + return 0; + *value = *rb->head++; + return 1; +} + static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value) { char *after = rb->head + sizeof *value; @@ -381,9 +422,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le static int api_ws_read_object(struct readbuf *rb, struct json_object **object) { - size_t length; const char *string; - return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0); + struct json_object *o; + int rc = api_ws_read_string(rb, &string, NULL); + if (rc) { + o = json_tokener_parse(string); + if (o == NULL && strcmp(string, "null")) + o = json_object_new_string(string); + *object = o; + } + return rc; } static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length) @@ -438,22 +486,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) return string != NULL && api_ws_write_string(wb, string); } - - - /******************* client part **********************************/ /* * structure for recording query data */ struct api_ws_memo { - struct api_ws_memo *next; /* the next memo */ + struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ struct afb_xreq *xreq; /* the request handle */ -#if 0 - struct afb_req req; /* the request handle */ - struct afb_context *context; /* the context of the query */ -#endif uint32_t msgid; /* the message identifier */ }; @@ -575,7 +616,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st } /* read a subscrition message */ -static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo) +static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev) { return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev); } @@ -649,7 +690,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* subscribe the request from the event */ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); @@ -662,7 +703,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* unsubscribe the request from the event */ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); @@ -738,39 +779,102 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) api_ws_client_memo_destroy(memo); } +/* send a subcall reply */ +static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object) +{ + int rc; + struct writebuf wb = { .count = 0 }; + char ie = (char)!!iserror; + + if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY) + || !api_ws_write_uint32(&wb, reply->subcallid) + || !api_ws_write_char(&wb, ie) + || !api_ws_write_object(&wb, object)) { + /* write error ? */ + return; + } + + rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + return; + ERROR("error while sending subcall reply"); +} + +/* callback for subcall reply */ +static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object) +{ + api_ws_client_send_subcall_reply(closure, iserror, object); + free(closure); +} + +/* received a subcall request */ +static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb) +{ + struct api_ws_reply *reply; + struct api_ws_memo *memo; + const char *api, *verb; + uint32_t subcallid; + struct json_object *object; + + reply = malloc(sizeof *reply); + if (!reply) + return; + + /* retrieve the message data */ + if (!api_ws_client_msg_memo_get(apiws, rb, &memo)) + return; + + if (api_ws_read_uint32(rb, &subcallid) + && api_ws_read_string(rb, &api, NULL) + && api_ws_read_string(rb, &verb, NULL) + && api_ws_read_object(rb, &object)) { + reply->apiws = apiws; + reply->subcallid = subcallid; + afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply); + } +} + /* callback when receiving binary data */ static void api_ws_client_on_binary(void *closure, char *data, size_t size) { if (size > 0) { + struct api_ws *apiws = closure; struct readbuf rb = { .head = data, .end = data + size }; + + pthread_mutex_lock(&apiws->mutex); switch (*rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - api_ws_client_reply_success(closure, &rb); + api_ws_client_reply_success(apiws, &rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - api_ws_client_reply_fail(closure, &rb); + api_ws_client_reply_fail(apiws, &rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - api_ws_client_event_broadcast(closure, &rb); + api_ws_client_event_broadcast(apiws, &rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - api_ws_client_event_create(closure, &rb); + api_ws_client_event_create(apiws, &rb); break; case CHAR_FOR_EVT_DEL: /* drops the event */ - api_ws_client_event_drop(closure, &rb); + api_ws_client_event_drop(apiws, &rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - api_ws_client_event_push(closure, &rb); + api_ws_client_event_push(apiws, &rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - api_ws_client_event_subscribe(closure, &rb); + api_ws_client_event_subscribe(apiws, &rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - api_ws_client_event_unsubscribe(closure, &rb); + api_ws_client_event_unsubscribe(apiws, &rb); + break; + case CHAR_FOR_SUBCALL_CALL: /* subcall */ + api_ws_client_subcall(apiws, &rb); break; default: /* unexpected message */ + /* TODO: close the connection */ break; } + pthread_mutex_unlock(&apiws->mutex); } free(data); } @@ -783,13 +887,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) struct writebuf wb = { .count = 0 }; const char *raw; size_t szraw; - struct api_ws *api = closure; + struct api_ws *apiws = closure; + + pthread_mutex_lock(&apiws->mutex); /* create the recording data */ - memo = api_ws_client_memo_make(api, xreq); + memo = api_ws_client_memo_make(apiws, xreq); if (memo == NULL) { afb_xreq_fail_f(xreq, "error", "out of memory"); - return; + goto end; } /* creates the call message */ @@ -805,12 +911,10 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) goto overflow; /* send */ - rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count); - if (rc < 0) - goto ws_send_error; - return; + rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + goto end; -ws_send_error: afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; @@ -823,6 +927,8 @@ overflow: clean_memo: api_ws_client_memo_destroy(memo); +end: + pthread_mutex_unlock(&apiws->mutex); } static int api_ws_service_start_cb(void *closure, int share_session, int onneed) @@ -912,63 +1018,114 @@ error: static void api_ws_server_client_unref(struct api_ws_client *client) { - if (!--client->refcount) { + struct api_ws_subcall *sc, *nsc; + + if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(client->listener); afb_ws_destroy(client->ws); + nsc = client->subcalls; + while (nsc) { + sc= nsc; + nsc = sc->next; + sc->callback(sc->closure, 1, NULL); + free(sc); + } afb_cred_unref(client->cred); free(client); } } +static void api_ws_server_client_addref(struct api_ws_client *client) +{ + __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED); +} + /* on call, propagate it to the ws service */ static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb) { struct api_ws_server_req *wreq; + char *cverb; const char *uuid, *verb; - uint32_t flags; - - client->refcount++; - - /* create the request */ - wreq = calloc(1 , sizeof *wreq); - if (wreq == NULL) - goto out_of_memory; + uint32_t flags, msgid; + size_t lenverb; + struct json_object *object; - wreq->client = client; + api_ws_server_client_addref(client); /* reads the call message data */ - if (!api_ws_read_uint32(rb, &wreq->msgid) + if (!api_ws_read_uint32(rb, &msgid) || !api_ws_read_uint32(rb, &flags) - || !api_ws_read_string(rb, &verb, NULL) + || !api_ws_read_string(rb, &verb, &lenverb) || !api_ws_read_string(rb, &uuid, NULL) - || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) + || !api_ws_read_object(rb, &object)) goto overflow; + /* create the request */ + wreq = malloc(++lenverb + sizeof *wreq); + if (wreq == NULL) + goto out_of_memory; + afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf); - wreq->xreq.json = json_tokener_parse(wreq->request); - if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) { - wreq->xreq.json = json_object_new_string(wreq->request); - } + wreq->client = client; + wreq->msgid = msgid; + cverb = (char*)&wreq[1]; + memcpy(cverb, verb, lenverb); /* init the context */ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) - goto out_of_memory; + goto unconnected; wreq->xreq.context.flags = flags; /* makes the call */ wreq->xreq.cred = afb_cred_addref(client->cred); wreq->xreq.api = client->api; - wreq->xreq.verb = verb; + wreq->xreq.verb = cverb; + wreq->xreq.json = object; afb_apis_call(&wreq->xreq); afb_xreq_unref(&wreq->xreq); return; +unconnected: + free(wreq); out_of_memory: + json_object_put(object); overflow: - free(wreq); api_ws_server_client_unref(client); } +/* on subcall reply */ +static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb) +{ + char iserror; + uint32_t subcallid; + struct json_object *object; + struct api_ws_subcall *sc, **psc; + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &subcallid) + || !api_ws_read_char(rb, &iserror) + || !api_ws_read_object(rb, &object)) { + /* TODO bad protocol */ + return; + } + + /* search the subcall and unlink it */ + pthread_mutex_lock(&client->mutex); + psc = &client->subcalls; + while ((sc = *psc) && sc->subcallid != subcallid) + psc = &sc->next; + if (!sc) { + pthread_mutex_unlock(&client->mutex); + /* TODO subcall not found */ + } else { + *psc = sc->next; + pthread_mutex_unlock(&client->mutex); + sc->callback(sc->closure, (int)iserror, object); + free(sc); + } + json_object_put(object); +} + /* callback when receiving binary data */ static void api_ws_server_on_binary(void *closure, char *data, size_t size) { @@ -978,6 +1135,9 @@ static void api_ws_server_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_CALL: api_ws_server_on_call(closure, &rb); break; + case CHAR_FOR_SUBCALL_REPLY: + api_ws_server_on_subcall_reply(closure, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1021,6 +1181,7 @@ static void api_ws_server_accept(struct api_ws *api) if (client->ws != NULL) { client->api = api->api; client->refcount = 1; + client->subcalls = NULL; return; } afb_cred_unref(client->cred); @@ -1135,6 +1296,47 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, ERROR("error while sending fail"); } +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) +{ + int rc; + struct writebuf wb = { .count = 0 }; + struct api_ws_subcall *sc, *osc; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); + struct api_ws_client *client = wreq->client; + + sc = malloc(sizeof *sc); + if (!sc) { + + } else { + sc->callback = callback; + sc->closure = cb_closure; + + pthread_mutex_unlock(&client->mutex); + sc->subcallid = (uint32_t)(((intptr_t)sc) >> 6); + do { + sc->subcallid++; + osc = client->subcalls; + while(osc && osc->subcallid != sc->subcallid) + osc = osc->next; + } while (osc); + sc->next = client->subcalls; + client->subcalls = sc; + pthread_mutex_unlock(&client->mutex); + + if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL) + && api_ws_write_uint32(&wb, wreq->msgid) + && api_ws_write_uint32(&wb, sc->subcallid) + && api_ws_write_string(&wb, api) + && api_ws_write_string(&wb, verb) + && api_ws_write_object(&wb, args)) { + rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); + if (rc >= 0) + return; + } + ERROR("error while sending fail"); + } +} + static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; diff --git a/stress-server.sh b/stress-server.sh index a0d83fd9..31fce9f6 100755 --- a/stress-server.sh +++ b/stress-server.sh @@ -3,16 +3,28 @@ ROOT=$(dirname $0) echo ROOT=$ROOT -AFB=$ROOT/build/src/afb-daemon +cd $ROOT + +AFB=build/src/afb-daemon HELLO=build/bindings/samples/helloWorld.so PORT=12345 TEST=test TOKEN=knock-knock-knoc -OUT=$ROOT/stress-out-server +OUT=stress-out-server rm $OUT* -ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO" +case "$1" in + --ws) + shift + ARGS="-q --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello " +# ARGS="-vv --tracereq=all --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello " + ;; + *) + ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO" + ;; +esac + echo -n launch afb... case "$1" in |