aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-04-03 17:35:03 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2017-04-03 17:35:03 +0200
commitc63291ad456fd8584e0fed454baa5f0322beb27e (patch)
treee9d5218d86b012e330f38ab1fa985ffa8a52474f
parent8d322ebdd04d6de2d5649626bbc23aae0d0ed556 (diff)
Switch API websocket to xreq
Change-Id: I57600d8dc99bf37f207b126a0e3ab5731ad08ced Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-api-ws.c214
1 files changed, 53 insertions, 161 deletions
diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c
index 89b83bc5..82671875 100644
--- a/src/afb-api-ws.c
+++ b/src/afb-api-ws.c
@@ -44,7 +44,7 @@
#include "afb-api-so.h"
#include "afb-context.h"
#include "afb-evt.h"
-#include "afb-subcall.h"
+#include "afb-xreq.h"
#include "verbose.h"
#include "sd-fds.h"
@@ -147,47 +147,26 @@ static const struct afb_ws_itf api_ws_server_ws_itf =
* structure for a ws request
*/
struct api_ws_server_req {
- struct afb_context context; /* the context, should be THE FIRST */
+ struct afb_xreq xreq; /* the xreq */
struct api_ws_client *client; /* the client of the request */
char *rcvdata; /* the received data to free */
- struct json_object *json; /* the readen request as object */
const char *request; /* the readen request as string */
size_t lenreq; /* the length of the request */
- int refcount; /* reference count of the request */
uint32_t msgid; /* the incoming request msgid */
};
-static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq);
-static void api_ws_server_req_unref(struct api_ws_server_req *wreq);
-
-static struct json_object *api_ws_server_req_json_cb(void *closure);
-static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name);
static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info);
static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info);
-static const char *api_ws_server_req_raw_cb(void *closure, size_t *size);
-static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size);
-static void api_ws_server_req_addref_cb(void *closure);
-static void api_ws_server_req_unref_cb(void *closure);
+static void api_ws_server_req_destroy_cb(void *closure);
static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event);
static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event);
-static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
-const struct afb_req_itf afb_api_ws_req_itf = {
- .json = api_ws_server_req_json_cb,
- .get = api_ws_server_req_get_cb,
+const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
.success = api_ws_server_req_success_cb,
.fail = api_ws_server_req_fail_cb,
- .raw = api_ws_server_req_raw_cb,
- .send = api_ws_server_req_send_cb,
- .context_get = (void*)afb_context_get,
- .context_set = (void*)afb_context_set,
- .addref = api_ws_server_req_addref_cb,
- .unref = api_ws_server_req_unref_cb,
- .session_close = (void*)afb_context_close,
- .session_set_LOA = (void*)afb_context_change_loa,
+ .unref = api_ws_server_req_destroy_cb,
.subscribe = api_ws_server_req_subscribe_cb,
- .unsubscribe = api_ws_server_req_unsubscribe_cb,
- .subcall = api_ws_server_req_subcall_cb
+ .unsubscribe = api_ws_server_req_unsubscribe_cb
};
/******************* common part **********************************/
@@ -216,7 +195,7 @@ static struct api_ws *api_ws_make(const char *path)
while (length && path[length - 1] != '/' && path[length - 1] != ':')
length = length - 1;
api->api = &api->path[length];
- if (api->api == NULL || !afb_apis_is_valid_api_name(++api->api)) {
+ if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) {
errno = EINVAL;
goto error2;
}
@@ -459,8 +438,11 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
struct api_ws_memo {
struct api_ws_memo *next; /* the next memo */
struct api_ws *api; /* the ws api */
+ struct afb_xreq *xreq; /* the request handle */
+#if 0
struct afb_req req; /* the request handle */
struct afb_context *context; /* the context of the query */
+#endif
uint32_t msgid; /* the message identifier */
};
@@ -498,15 +480,14 @@ static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint3
/* allocates and init the memorizing data */
-static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_req req, struct afb_context *context)
+static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
{
struct api_ws_memo *memo;
memo = malloc(sizeof *memo);
if (memo != NULL) {
- afb_req_addref(req);
- memo->req = req;
- memo->context = context;
+ afb_xreq_addref(xreq);
+ memo->xreq = xreq;
do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
memo->api = api;
memo->next = api->client.memos;
@@ -529,7 +510,7 @@ static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
prv = &(*prv)->next;
}
- afb_req_unref(memo->req);
+ afb_xreq_unref(memo->xreq);
free(memo);
}
@@ -659,7 +640,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb
if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
/* subscribe the request from the event */
- if (afb_req_subscribe(memo->req, ev->event) < 0)
+ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
ERROR("can't subscribe: %m");
}
}
@@ -672,7 +653,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *
if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
/* unsubscribe the request from the event */
- if (afb_req_unsubscribe(memo->req, ev->event) < 0)
+ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
ERROR("can't unsubscribe: %m");
}
}
@@ -715,11 +696,11 @@ static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
if (api_ws_read_uint32(rb, &flags)
&& api_ws_read_string(rb, &info, NULL)
&& api_ws_read_object(rb, &object)) {
- memo->context->flags = (unsigned)flags;
- afb_req_success(memo->req, object, *info ? info : NULL);
+ memo->xreq->context.flags = (unsigned)flags;
+ afb_xreq_success(memo->xreq, object, *info ? info : NULL);
} else {
/* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
+ afb_xreq_fail(memo->xreq, "error", "ws error");
}
api_ws_client_memo_destroy(memo);
}
@@ -737,33 +718,11 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
if (api_ws_read_uint32(rb, &flags)
&& api_ws_read_string(rb, &status, NULL)
&& api_ws_read_string(rb, &info, NULL)) {
- memo->context->flags = (unsigned)flags;
- afb_req_fail(memo->req, status, *info ? info : NULL);
+ memo->xreq->context.flags = (unsigned)flags;
+ afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
} else {
/* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
- }
- api_ws_client_memo_destroy(memo);
-}
-
-static void api_ws_client_reply_send(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_memo *memo;
- const char *data;
- size_t length;
- uint32_t flags;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &data, &length)) {
- memo->context->flags = (unsigned)flags;
- afb_req_send(memo->req, data, length);
- } else {
- /* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
+ afb_xreq_fail(memo->xreq, "error", "ws error");
}
api_ws_client_memo_destroy(memo);
}
@@ -780,9 +739,6 @@ static void api_ws_client_on_binary(void *closure, char *data, size_t size)
case 'F': /* fail */
api_ws_client_reply_fail(closure, &rb);
break;
- case 'X': /* send */
- api_ws_client_reply_send(closure, &rb);
- break;
case '*': /* broadcast */
api_ws_client_event_broadcast(closure, &rb);
break;
@@ -809,7 +765,7 @@ static void api_ws_client_on_binary(void *closure, char *data, size_t size)
}
/* on call, propagate it to the ws service */
-static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb_context *context, const char *verb)
+static void api_ws_client_xcall_cb(void * closure, struct afb_xreq *xreq)
{
int rc;
struct api_ws_memo *memo;
@@ -819,20 +775,20 @@ static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb
struct api_ws *api = closure;
/* create the recording data */
- memo = api_ws_client_memo_make(api, req, context);
+ memo = api_ws_client_memo_make(api, xreq);
if (memo == NULL) {
- afb_req_fail(req, "error", "out of memory");
+ afb_xreq_fail_f(xreq, "error", "out of memory");
return;
}
/* creates the call message */
- raw = afb_req_raw(req, &szraw);
+ raw = afb_xreq_raw(xreq, &szraw);
if (raw == NULL)
goto internal_error;
if (!api_ws_write_uint32(&wb, memo->msgid)
- || !api_ws_write_uint32(&wb, (uint32_t)context->flags)
- || !api_ws_write_string(&wb, verb)
- || !api_ws_write_string(&wb, afb_session_uuid(context->session))
+ || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
+ || !api_ws_write_string(&wb, xreq->verb)
+ || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
|| !api_ws_write_string_length(&wb, raw, szraw))
goto overflow;
@@ -843,15 +799,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb
return;
ws_send_error:
- afb_req_fail(req, "error", "websocket sending error");
+ afb_xreq_fail(xreq, "error", "websocket sending error");
goto clean_memo;
internal_error:
- afb_req_fail(req, "error", "internal: raw is NULL!");
+ afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
goto clean_memo;
overflow:
- afb_req_fail(req, "error", "overflow: size doesn't match 32 bits!");
+ afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
clean_memo:
api_ws_client_memo_destroy(memo);
@@ -921,7 +877,7 @@ int afb_api_ws_add_client(const char *path)
/* record it as an API */
afb_api.closure = api;
- afb_api.call = api_ws_client_call_cb;
+ afb_api.xcall = api_ws_client_xcall_cb;
afb_api.service_start = api_ws_service_start_cb;
if (afb_apis_add(api->api, afb_api) < 0)
goto error3;
@@ -951,7 +907,6 @@ static void api_ws_server_client_unref(struct api_ws_client *client)
static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size)
{
struct api_ws_server_req *wreq;
- struct afb_req areq;
const char *uuid, *verb;
uint32_t flags;
@@ -964,7 +919,6 @@ static void api_ws_server_called(struct api_ws_client *client, struct readbuf *r
wreq->client = client;
wreq->rcvdata = data;
- wreq->refcount = 1;
/* reads the call message data */
if (!api_ws_read_uint32(rb, &wreq->msgid)
@@ -974,16 +928,24 @@ static void api_ws_server_called(struct api_ws_client *client, struct readbuf *r
|| !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
goto overflow;
+ wreq->xreq.json = json_tokener_parse(wreq->request);
+ if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) {
+ wreq->xreq.json = json_object_new_string(wreq->request);
+ }
+
/* init the context */
- if (afb_context_connect(&wreq->context, uuid, NULL) < 0)
+ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
goto out_of_memory;
- wreq->context.flags = flags;
+ wreq->xreq.context.flags = flags;
/* makes the call */
- areq.itf = &afb_api_ws_req_itf;
- areq.closure = wreq;
- afb_apis_call(areq, &wreq->context, client->api, verb);
- api_ws_server_req_unref(wreq);
+ wreq->xreq.refcount = 1;
+ wreq->xreq.api = client->api;
+ wreq->xreq.verb = verb;
+ wreq->xreq.query = wreq;
+ wreq->xreq.queryitf = &afb_api_ws_xreq_itf;
+ afb_apis_xcall(&wreq->xreq);
+ afb_xreq_unref(&wreq->xreq);
return;
out_of_memory:
@@ -1097,56 +1059,18 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int
/******************* ws request part for server *****************/
-/* increment the reference count of the request */
-static void api_ws_server_req_addref_cb(void *closure)
-{
- struct api_ws_server_req *wreq = closure;
- wreq->refcount++;
-}
-
/* decrement the reference count of the request and free/release it on falling to null */
-static void api_ws_server_req_unref_cb(void *closure)
+static void api_ws_server_req_destroy_cb(void *closure)
{
- api_ws_server_req_unref(closure);
-}
-
-static void api_ws_server_req_unref(struct api_ws_server_req *wreq)
-{
- if (wreq == NULL || --wreq->refcount)
- return;
+ struct api_ws_server_req *wreq = closure;
- afb_context_disconnect(&wreq->context);
- json_object_put(wreq->json);
+ afb_context_disconnect(&wreq->xreq.context);
+ json_object_put(wreq->xreq.json);
free(wreq->rcvdata);
api_ws_server_client_unref(wreq->client);
free(wreq);
}
-/* get the object of the request */
-static struct json_object *api_ws_server_req_json_cb(void *closure)
-{
- return api_ws_server_req_json(closure);
-}
-
-static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq)
-{
- if (wreq->json == NULL) {
- wreq->json = json_tokener_parse(wreq->request);
- if (wreq->json == NULL && strcmp(wreq->request, "null")) {
- /* lazy error detection of json request. Is it to improve? */
- wreq->json = json_object_new_string(wreq->request);
- }
- }
- return wreq->json;
-}
-
-/* get the argument of the request of 'name' */
-static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name)
-{
- struct api_ws_server_req *wreq = closure;
- return afb_msg_json_get_arg(api_ws_server_req_json(wreq), name);
-}
-
static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info)
{
int rc;
@@ -1155,7 +1079,7 @@ static void api_ws_server_req_success_cb(void *closure, struct json_object *obj,
if (api_ws_write_char(&wb, 'T')
&& api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
+ && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
&& api_ws_write_string(&wb, info ? : "")
&& api_ws_write_object(&wb, obj)) {
rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
@@ -1175,7 +1099,7 @@ static void api_ws_server_req_fail_cb(void *closure, const char *status, const c
if (api_ws_write_char(&wb, 'F')
&& api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
+ && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
&& api_ws_write_string(&wb, status)
&& api_ws_write_string(&wb, info ? : "")) {
rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
@@ -1185,32 +1109,6 @@ static void api_ws_server_req_fail_cb(void *closure, const char *status, const c
ERROR("error while sending fail");
}
-static const char *api_ws_server_req_raw_cb(void *closure, size_t *size)
-{
- struct api_ws_server_req *wreq = closure;
- if (size != NULL)
- *size = wreq->lenreq;
- return wreq->request;
-}
-
-static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size)
-{
- /* TODO: how to put sized buffer as strings? things aren't clear here!!! */
- int rc;
- struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
-
- if (api_ws_write_char(&wb, 'X')
- && api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
- && api_ws_write_string_length(&wb, buffer, size)) {
- rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
- if (rc >= 0)
- return;
- }
- ERROR("error while sending raw");
-}
-
static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event)
{
int rc, rc2;
@@ -1254,12 +1152,6 @@ success:
return rc;
}
-static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
-{
- struct api_ws_server_req *wreq = closure;
- afb_subcall(&wreq->context, api, verb, args, callback, cb_closure, (struct afb_req){ .itf = &afb_api_ws_req_itf, .closure = wreq });
-}
-
/******************* server part **********************************/
static int api_ws_server_connect(struct api_ws *api);