aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/afb-api-ws.c314
-rwxr-xr-xstress-server.sh18
2 files changed, 273 insertions, 59 deletions
diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c
index 9b1d8cc7..ea0781df 100644
--- a/src/afb-api-ws.c
+++ b/src/afb-api-ws.c
@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <systemd/sd-event.h>
@@ -62,6 +63,8 @@ struct api_ws_client;
#define CHAR_FOR_EVT_PUSH '!'
#define CHAR_FOR_EVT_SUBSCRIBE 'S'
#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
+#define CHAR_FOR_SUBCALL_CALL 'B'
+#define CHAR_FOR_SUBCALL_REPLY 'R'
/*
*/
@@ -70,6 +73,7 @@ struct api_ws
char *path; /* path of the object for the API */
char *api; /* api name of the interface */
int fd; /* file descriptor */
+ pthread_mutex_t mutex; /**< resource control */
union {
struct {
uint32_t id;
@@ -78,8 +82,7 @@ struct api_ws
struct api_ws_memo *memos;
} client;
struct {
- sd_event_source *listensrc;
- struct afb_evt_listener *listener; /* listener for broadcasted events */
+ sd_event_source *listensrc; /**< systemd source for server socket */
} server;
};
};
@@ -116,6 +119,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = {
.remove = api_ws_server_event_remove
};
+/******************* handling subcalls *****************************/
+
+/**
+ * Structure on server side for recording pending
+ * subcalls.
+ */
+struct api_ws_subcall
+{
+ struct api_ws_subcall *next; /**< next subcall for the client */
+ uint32_t subcallid; /**< the subcallid */
+ void (*callback)(void*, int, struct json_object*); /**< callback on completion */
+ void *closure; /**< closure of the callback */
+};
+
+/**
+ * Structure for sending back replies on client side
+ */
+struct api_ws_reply
+{
+ struct api_ws *apiws; /**< api descriptor */
+ uint32_t subcallid; /**< subcallid for the reply */
+};
+
/******************* client description part for server *****************************/
struct api_ws_client
@@ -126,17 +152,23 @@ struct api_ws_client
/* count of references */
int refcount;
- /* listener for events */
- struct afb_evt_listener *listener;
-
/* file descriptor */
int fd;
+ /* resource control */
+ pthread_mutex_t mutex;
+
+ /* listener for events */
+ struct afb_evt_listener *listener;
+
/* websocket */
struct afb_ws *ws;
/* credentials */
struct afb_cred *cred;
+
+ /* pending subcalls */
+ struct api_ws_subcall *subcalls;
};
/******************* websocket interface for client part **********************************/
@@ -161,8 +193,6 @@ static const struct afb_ws_itf api_ws_server_ws_itf =
struct api_ws_server_req {
struct afb_xreq xreq; /* the xreq */
struct api_ws_client *client; /* the client of the request */
- const char *request; /* the readen request as string */
- size_t lenreq; /* the length of the request */
uint32_t msgid; /* the incoming request msgid */
};
@@ -171,11 +201,13 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status,
static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
.success = api_ws_server_req_success_cb,
.fail = api_ws_server_req_fail_cb,
.unref = api_ws_server_req_destroy_cb,
+ .subcall = api_ws_server_req_subcall_cb,
.subscribe = api_ws_server_req_subscribe_cb,
.unsubscribe = api_ws_server_req_unsubscribe_cb
};
@@ -210,6 +242,7 @@ static struct api_ws *api_ws_make(const char *path)
errno = EINVAL;
goto error2;
}
+ pthread_mutex_init(&api->mutex, NULL);
api->fd = -1;
return api;
@@ -358,6 +391,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
return before;
}
+static int api_ws_read_char(struct readbuf *rb, char *value)
+{
+ if (rb->head >= rb->end)
+ return 0;
+ *value = *rb->head++;
+ return 1;
+}
+
static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
{
char *after = rb->head + sizeof *value;
@@ -381,9 +422,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le
static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
{
- size_t length;
const char *string;
- return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0);
+ struct json_object *o;
+ int rc = api_ws_read_string(rb, &string, NULL);
+ if (rc) {
+ o = json_tokener_parse(string);
+ if (o == NULL && strcmp(string, "null"))
+ o = json_object_new_string(string);
+ *object = o;
+ }
+ return rc;
}
static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
@@ -438,22 +486,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
return string != NULL && api_ws_write_string(wb, string);
}
-
-
-
/******************* client part **********************************/
/*
* structure for recording query data
*/
struct api_ws_memo {
- struct api_ws_memo *next; /* the next memo */
+ struct api_ws_memo *next; /* the next memo */
struct api_ws *api; /* the ws api */
struct afb_xreq *xreq; /* the request handle */
-#if 0
- struct afb_req req; /* the request handle */
- struct afb_context *context; /* the context of the query */
-#endif
uint32_t msgid; /* the message identifier */
};
@@ -575,7 +616,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st
}
/* read a subscrition message */
-static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo)
+static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
{
return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
}
@@ -649,7 +690,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* subscribe the request from the event */
if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
ERROR("can't subscribe: %m");
@@ -662,7 +703,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* unsubscribe the request from the event */
if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
ERROR("can't unsubscribe: %m");
@@ -738,39 +779,102 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
api_ws_client_memo_destroy(memo);
}
+/* send a subcall reply */
+static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
+{
+ int rc;
+ struct writebuf wb = { .count = 0 };
+ char ie = (char)!!iserror;
+
+ if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+ || !api_ws_write_uint32(&wb, reply->subcallid)
+ || !api_ws_write_char(&wb, ie)
+ || !api_ws_write_object(&wb, object)) {
+ /* write error ? */
+ return;
+ }
+
+ rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ ERROR("error while sending subcall reply");
+}
+
+/* callback for subcall reply */
+static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
+{
+ api_ws_client_send_subcall_reply(closure, iserror, object);
+ free(closure);
+}
+
+/* received a subcall request */
+static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
+{
+ struct api_ws_reply *reply;
+ struct api_ws_memo *memo;
+ const char *api, *verb;
+ uint32_t subcallid;
+ struct json_object *object;
+
+ reply = malloc(sizeof *reply);
+ if (!reply)
+ return;
+
+ /* retrieve the message data */
+ if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
+ return;
+
+ if (api_ws_read_uint32(rb, &subcallid)
+ && api_ws_read_string(rb, &api, NULL)
+ && api_ws_read_string(rb, &verb, NULL)
+ && api_ws_read_object(rb, &object)) {
+ reply->apiws = apiws;
+ reply->subcallid = subcallid;
+ afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
+ }
+}
+
/* callback when receiving binary data */
static void api_ws_client_on_binary(void *closure, char *data, size_t size)
{
if (size > 0) {
+ struct api_ws *apiws = closure;
struct readbuf rb = { .head = data, .end = data + size };
+
+ pthread_mutex_lock(&apiws->mutex);
switch (*rb.head++) {
case CHAR_FOR_ANSWER_SUCCESS: /* success */
- api_ws_client_reply_success(closure, &rb);
+ api_ws_client_reply_success(apiws, &rb);
break;
case CHAR_FOR_ANSWER_FAIL: /* fail */
- api_ws_client_reply_fail(closure, &rb);
+ api_ws_client_reply_fail(apiws, &rb);
break;
case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- api_ws_client_event_broadcast(closure, &rb);
+ api_ws_client_event_broadcast(apiws, &rb);
break;
case CHAR_FOR_EVT_ADD: /* creates the event */
- api_ws_client_event_create(closure, &rb);
+ api_ws_client_event_create(apiws, &rb);
break;
case CHAR_FOR_EVT_DEL: /* drops the event */
- api_ws_client_event_drop(closure, &rb);
+ api_ws_client_event_drop(apiws, &rb);
break;
case CHAR_FOR_EVT_PUSH: /* pushs the event */
- api_ws_client_event_push(closure, &rb);
+ api_ws_client_event_push(apiws, &rb);
break;
case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- api_ws_client_event_subscribe(closure, &rb);
+ api_ws_client_event_subscribe(apiws, &rb);
break;
case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- api_ws_client_event_unsubscribe(closure, &rb);
+ api_ws_client_event_unsubscribe(apiws, &rb);
+ break;
+ case CHAR_FOR_SUBCALL_CALL: /* subcall */
+ api_ws_client_subcall(apiws, &rb);
break;
default: /* unexpected message */
+ /* TODO: close the connection */
break;
}
+ pthread_mutex_unlock(&apiws->mutex);
}
free(data);
}
@@ -783,13 +887,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
struct writebuf wb = { .count = 0 };
const char *raw;
size_t szraw;
- struct api_ws *api = closure;
+ struct api_ws *apiws = closure;
+
+ pthread_mutex_lock(&apiws->mutex);
/* create the recording data */
- memo = api_ws_client_memo_make(api, xreq);
+ memo = api_ws_client_memo_make(apiws, xreq);
if (memo == NULL) {
afb_xreq_fail_f(xreq, "error", "out of memory");
- return;
+ goto end;
}
/* creates the call message */
@@ -805,12 +911,10 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
goto overflow;
/* send */
- rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count);
- if (rc < 0)
- goto ws_send_error;
- return;
+ rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ goto end;
-ws_send_error:
afb_xreq_fail(xreq, "error", "websocket sending error");
goto clean_memo;
@@ -823,6 +927,8 @@ overflow:
clean_memo:
api_ws_client_memo_destroy(memo);
+end:
+ pthread_mutex_unlock(&apiws->mutex);
}
static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
@@ -912,63 +1018,114 @@ error:
static void api_ws_server_client_unref(struct api_ws_client *client)
{
- if (!--client->refcount) {
+ struct api_ws_subcall *sc, *nsc;
+
+ if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
afb_evt_listener_unref(client->listener);
afb_ws_destroy(client->ws);
+ nsc = client->subcalls;
+ while (nsc) {
+ sc= nsc;
+ nsc = sc->next;
+ sc->callback(sc->closure, 1, NULL);
+ free(sc);
+ }
afb_cred_unref(client->cred);
free(client);
}
}
+static void api_ws_server_client_addref(struct api_ws_client *client)
+{
+ __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
+}
+
/* on call, propagate it to the ws service */
static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
{
struct api_ws_server_req *wreq;
+ char *cverb;
const char *uuid, *verb;
- uint32_t flags;
-
- client->refcount++;
-
- /* create the request */
- wreq = calloc(1 , sizeof *wreq);
- if (wreq == NULL)
- goto out_of_memory;
+ uint32_t flags, msgid;
+ size_t lenverb;
+ struct json_object *object;
- wreq->client = client;
+ api_ws_server_client_addref(client);
/* reads the call message data */
- if (!api_ws_read_uint32(rb, &wreq->msgid)
+ if (!api_ws_read_uint32(rb, &msgid)
|| !api_ws_read_uint32(rb, &flags)
- || !api_ws_read_string(rb, &verb, NULL)
+ || !api_ws_read_string(rb, &verb, &lenverb)
|| !api_ws_read_string(rb, &uuid, NULL)
- || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
+ || !api_ws_read_object(rb, &object))
goto overflow;
+ /* create the request */
+ wreq = malloc(++lenverb + sizeof *wreq);
+ if (wreq == NULL)
+ goto out_of_memory;
+
afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
- wreq->xreq.json = json_tokener_parse(wreq->request);
- if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) {
- wreq->xreq.json = json_object_new_string(wreq->request);
- }
+ wreq->client = client;
+ wreq->msgid = msgid;
+ cverb = (char*)&wreq[1];
+ memcpy(cverb, verb, lenverb);
/* init the context */
if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
- goto out_of_memory;
+ goto unconnected;
wreq->xreq.context.flags = flags;
/* makes the call */
wreq->xreq.cred = afb_cred_addref(client->cred);
wreq->xreq.api = client->api;
- wreq->xreq.verb = verb;
+ wreq->xreq.verb = cverb;
+ wreq->xreq.json = object;
afb_apis_call(&wreq->xreq);
afb_xreq_unref(&wreq->xreq);
return;
+unconnected:
+ free(wreq);
out_of_memory:
+ json_object_put(object);
overflow:
- free(wreq);
api_ws_server_client_unref(client);
}
+/* on subcall reply */
+static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
+{
+ char iserror;
+ uint32_t subcallid;
+ struct json_object *object;
+ struct api_ws_subcall *sc, **psc;
+
+ /* reads the call message data */
+ if (!api_ws_read_uint32(rb, &subcallid)
+ || !api_ws_read_char(rb, &iserror)
+ || !api_ws_read_object(rb, &object)) {
+ /* TODO bad protocol */
+ return;
+ }
+
+ /* search the subcall and unlink it */
+ pthread_mutex_lock(&client->mutex);
+ psc = &client->subcalls;
+ while ((sc = *psc) && sc->subcallid != subcallid)
+ psc = &sc->next;
+ if (!sc) {
+ pthread_mutex_unlock(&client->mutex);
+ /* TODO subcall not found */
+ } else {
+ *psc = sc->next;
+ pthread_mutex_unlock(&client->mutex);
+ sc->callback(sc->closure, (int)iserror, object);
+ free(sc);
+ }
+ json_object_put(object);
+}
+
/* callback when receiving binary data */
static void api_ws_server_on_binary(void *closure, char *data, size_t size)
{
@@ -978,6 +1135,9 @@ static void api_ws_server_on_binary(void *closure, char *data, size_t size)
case CHAR_FOR_CALL:
api_ws_server_on_call(closure, &rb);
break;
+ case CHAR_FOR_SUBCALL_REPLY:
+ api_ws_server_on_subcall_reply(closure, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -1021,6 +1181,7 @@ static void api_ws_server_accept(struct api_ws *api)
if (client->ws != NULL) {
client->api = api->api;
client->refcount = 1;
+ client->subcalls = NULL;
return;
}
afb_cred_unref(client->cred);
@@ -1135,6 +1296,47 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status,
ERROR("error while sending fail");
}
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
+{
+ int rc;
+ struct writebuf wb = { .count = 0 };
+ struct api_ws_subcall *sc, *osc;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
+ struct api_ws_client *client = wreq->client;
+
+ sc = malloc(sizeof *sc);
+ if (!sc) {
+
+ } else {
+ sc->callback = callback;
+ sc->closure = cb_closure;
+
+ pthread_mutex_unlock(&client->mutex);
+ sc->subcallid = (uint32_t)(((intptr_t)sc) >> 6);
+ do {
+ sc->subcallid++;
+ osc = client->subcalls;
+ while(osc && osc->subcallid != sc->subcallid)
+ osc = osc->next;
+ } while (osc);
+ sc->next = client->subcalls;
+ client->subcalls = sc;
+ pthread_mutex_unlock(&client->mutex);
+
+ if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
+ && api_ws_write_uint32(&wb, wreq->msgid)
+ && api_ws_write_uint32(&wb, sc->subcallid)
+ && api_ws_write_string(&wb, api)
+ && api_ws_write_string(&wb, verb)
+ && api_ws_write_object(&wb, args)) {
+ rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ }
+ ERROR("error while sending fail");
+ }
+}
+
static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
{
int rc, rc2;
diff --git a/stress-server.sh b/stress-server.sh
index a0d83fd9..31fce9f6 100755
--- a/stress-server.sh
+++ b/stress-server.sh
@@ -3,16 +3,28 @@
ROOT=$(dirname $0)
echo ROOT=$ROOT
-AFB=$ROOT/build/src/afb-daemon
+cd $ROOT
+
+AFB=build/src/afb-daemon
HELLO=build/bindings/samples/helloWorld.so
PORT=12345
TEST=test
TOKEN=knock-knock-knoc
-OUT=$ROOT/stress-out-server
+OUT=stress-out-server
rm $OUT*
-ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO"
+case "$1" in
+ --ws)
+ shift
+ ARGS="-q --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello "
+# ARGS="-vv --tracereq=all --ldpaths=/tmp --binding=$HELLO --session-max=100 --ws-server=unix:hello --no-httpd --exec $AFB --session-max=100 --port=$PORT --ldpaths=/tmp --roothttp=$TEST --token=$TOKEN --ws-client=unix:hello "
+ ;;
+ *)
+ ARGS="-q --session-max=100 --port=$PORT --workdir=$ROOT --roothttp=$TEST --token=$TOKEN --ldpaths=/tmp --binding=$HELLO"
+ ;;
+esac
+
echo -n launch afb...
case "$1" in