diff options
author | José Bollo <jose.bollo@iot.bzh> | 2016-06-08 07:51:36 +0200 |
---|---|---|
committer | José Bollo <jose.bollo@iot.bzh> | 2016-06-08 11:49:58 +0200 |
commit | 9ee05e42384b34d8626cd8cccfd50538136f369f (patch) | |
tree | ef8f15a86e9b3b81a727156cbb09c8b85448b6b6 | |
parent | 80551a0405e8a208af587b6fff7f784a1a2ffbf4 (diff) |
Websocket: improves overall integration
Change-Id: I7af7b96d2f32b706eb378747c3719fa48f68c961
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r-- | src/afb-ws-json1.c | 315 | ||||
-rw-r--r-- | src/afb-wsj1.c | 19 | ||||
-rw-r--r-- | src/afb-wsj1.h | 16 |
3 files changed, 98 insertions, 252 deletions
diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c index c9be94d0..ffb6b81f 100644 --- a/src/afb-ws-json1.c +++ b/src/afb-ws-json1.c @@ -25,32 +25,32 @@ #include <json-c/json.h> -#include "afb-ws.h" +#include "afb-wsj1.h" #include "afb-ws-json1.h" #include "afb-msg-json.h" #include "session.h" #include <afb/afb-req-itf.h> #include "afb-apis.h" #include "afb-context.h" +#include "verbose.h" -static void aws_on_hangup(struct afb_ws_json1 *ws); -static void aws_on_text(struct afb_ws_json1 *ws, char *text, size_t size); +static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1); +static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg); -static struct afb_ws_itf aws_itf = { +static struct afb_wsj1_itf wsj1_itf = { .on_hangup = (void*)aws_on_hangup, - .on_text = (void*)aws_on_text + .on_call = (void*)aws_on_call }; struct afb_wsreq; struct afb_ws_json1 { + int refcount; void (*cleanup)(void*); void *cleanup_closure; - struct afb_wsreq *requests; struct AFB_clientCtx *session; - struct json_tokener *tokener; - struct afb_ws *ws; + struct afb_wsj1 *wsj1; int new_session; }; @@ -77,31 +77,25 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo if (result == NULL) goto error; + result->refcount = 1; result->cleanup = cleanup; result->cleanup_closure = cleanup_closure; - result->requests = NULL; result->session = ctxClientAddRef(context->session); result->new_session = context->created != 0; if (result->session == NULL) goto error2; - result->tokener = json_tokener_new(); - if (result->tokener == NULL) + result->wsj1 = afb_wsj1_create(fd, &wsj1_itf, result); + if (result->wsj1 == NULL) goto error3; - result->ws = afb_ws_create(fd, &aws_itf, result); - if (result->ws == NULL) - goto error4; - if (0 > ctxClientEventListenerAdd(result->session, listener_for(result))) - goto error5; + goto error4; return result; -error5: - afb_ws_destroy(result->ws); error4: - json_tokener_free(result->tokener); + afb_wsj1_unref(result->wsj1); error3: ctxClientUnref(result->session); error2: @@ -111,21 +105,28 @@ error: return NULL; } -static void aws_on_hangup(struct afb_ws_json1 *ws) +static struct afb_ws_json1 *aws_addref(struct afb_ws_json1 *ws) { - ctxClientEventListenerRemove(ws->session, listener_for(ws)); - afb_ws_destroy(ws->ws); - json_tokener_free(ws->tokener); - if (ws->cleanup != NULL) - ws->cleanup(ws->cleanup_closure); - ctxClientUnref(ws->session); - free(ws); + ws->refcount++; + return ws; } -#define CALL 2 -#define RETOK 3 -#define RETERR 4 -#define EVENT 5 +static void aws_unref(struct afb_ws_json1 *ws) +{ + if (--ws->refcount == 0) { + ctxClientEventListenerRemove(ws->session, listener_for(ws)); + afb_wsj1_unref(ws->wsj1); + if (ws->cleanup != NULL) + ws->cleanup(ws->cleanup_closure); + ctxClientUnref(ws->session); + free(ws); + } +} + +static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1) +{ + aws_unref(ws); +} struct afb_wsreq { @@ -137,20 +138,7 @@ struct afb_wsreq int refcount; struct afb_ws_json1 *aws; struct afb_wsreq *next; - char *text; - size_t size; - int code; - char *id; - size_t idlen; - char *api; - size_t apilen; - char *verb; - size_t verblen; - char *obj; - size_t objlen; - char *tok; - size_t toklen; - struct json_object *root; + struct afb_wsj1_msg *msgj1; }; static void wsreq_addref(struct afb_wsreq *wsreq); @@ -178,173 +166,40 @@ static const struct afb_req_itf wsreq_itf = { .session_set_LOA = (void*)afb_context_change_loa }; -static int aws_wsreq_parse(struct afb_wsreq *r, char *text, size_t size) -{ - char *pos, *end, c; - int aux; - - /* scan */ - pos = text; - end = text + size; - - /* scans: [ */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '[') goto bad_header; - - /* scans code: 2|3|4 */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - switch (*pos++) { - case '2': r->code = CALL; break; - case '3': r->code = RETOK; break; - case '4': r->code = RETERR; break; - default: goto bad_header; - } - - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - - /* scans id: "id" */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - r->id = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->idlen = (size_t)(pos++ - r->id); - - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - - /* scans the method if needed */ - if (r->code == CALL) { - /* scans: " */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - - /* scans: api/ */ - r->api = pos; - while(pos < end && *pos != '"' && *pos != '/') pos++; - if (pos == end) goto bad_header; - if (*pos != '/') goto bad_header; - r->apilen = (size_t)(pos++ - r->api); - if (r->apilen && r->api[r->apilen - 1] == '\\') - r->apilen--; - - /* scans: verb" */ - r->verb = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->verblen = (size_t)(pos++ - r->verb); - - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - } - - /* scan obj */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - aux = 0; - r->obj = pos; - while (pos < end && (aux != 0 || (*pos != ',' && *pos != ']'))) { - if (pos == end) goto bad_header; - switch(*pos) { - case '{': case '[': aux++; break; - case '}': case ']': if (!aux--) goto bad_header; break; - case '"': - do { - pos += 1 + (*pos == '\\'); - } while(pos < end && *pos != '"'); - default: - break; - } - pos++; - } - if (pos > end) goto bad_header; - if (pos == end && aux != 0) goto bad_header; - c = *pos; - r->objlen = (size_t)(pos++ - r->obj); - while (r->objlen && r->obj[r->objlen - 1] == ' ') - r->objlen--; - - /* scan the token (if any) */ - if (c == ',') { - /* scans token: "token" */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - r->tok = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->toklen = (size_t)(pos++ - r->tok); - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - c = *pos++; - } - - /* scan: ] */ - if (c != ']') goto bad_header; - while(pos < end && *pos == ' ') pos++; - if (pos != end) goto bad_header; - - /* done */ - r->text = text; - r->size = size; - return 1; - -bad_header: - return 0; -} - -static void aws_on_text(struct afb_ws_json1 *ws, char *text, size_t size) +static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg) { struct afb_req r; struct afb_wsreq *wsreq; + DEBUG("received websocket request for %s/%s: %s", api, verb, afb_wsj1_msg_object_s(msg)); + /* allocate */ wsreq = calloc(1, sizeof *wsreq); - if (wsreq == NULL) - goto alloc_error; - - /* init */ - if (!aws_wsreq_parse(wsreq, text, size)) - goto bad_header; + if (wsreq == NULL) { + afb_wsj1_close(ws->wsj1, 1008, NULL); + return; + } - /* fill and record the request */ - if (wsreq->tok != NULL) - wsreq->tok[wsreq->toklen] = 0; - afb_context_init(&wsreq->context, ws->session, wsreq->tok); + /* init the context */ + afb_context_init(&wsreq->context, ws->session, afb_wsj1_msg_token(msg)); if (!wsreq->context.invalidated) wsreq->context.validated = 1; if (ws->new_session != 0) { wsreq->context.created = 1; ws->new_session = 0; } + + /* fill and record the request */ + afb_wsj1_msg_addref(msg); + wsreq->msgj1 = msg; wsreq->refcount = 1; - wsreq->aws = ws; - wsreq->next = ws->requests; - ws->requests = wsreq; + wsreq->aws = aws_addref(ws); + /* emits the call */ r.closure = wsreq; r.itf = &wsreq_itf; - afb_apis_call(r, &wsreq->context, wsreq->api, wsreq->apilen, wsreq->verb, wsreq->verblen); + afb_apis_call_(r, &wsreq->context, api, verb); wsreq_unref(wsreq); - return; - -bad_header: - free(wsreq); -alloc_error: - free(text); - afb_ws_close(ws->ws, 1008, NULL); - return; } static void wsreq_addref(struct afb_wsreq *wsreq) @@ -355,34 +210,16 @@ static void wsreq_addref(struct afb_wsreq *wsreq) static void wsreq_unref(struct afb_wsreq *wsreq) { if (--wsreq->refcount == 0) { - struct afb_wsreq **prv = &wsreq->aws->requests; - while(*prv != NULL) { - if (*prv == wsreq) { - *prv = wsreq->next; - break; - } - prv = &(*prv)->next; - } afb_context_disconnect(&wsreq->context); - json_object_put(wsreq->root); - free(wsreq->text); + afb_wsj1_msg_unref(wsreq->msgj1); + aws_unref(wsreq->aws); free(wsreq); } } static struct json_object *wsreq_json(struct afb_wsreq *wsreq) { - struct json_object *root = wsreq->root; - if (root == NULL) { - json_tokener_reset(wsreq->aws->tokener); - root = json_tokener_parse_ex(wsreq->aws->tokener, wsreq->obj, (int)wsreq->objlen); - if (root == NULL) { - /* lazy error detection of json request. Is it to improve? */ - root = json_object_new_string_len(wsreq->obj, (int)wsreq->objlen); - } - wsreq->root = root; - } - return root; + return afb_wsj1_msg_object_j(wsreq->msgj1); } static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name) @@ -402,54 +239,40 @@ static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name) return arg; } -static void aws_emit(struct afb_ws_json1 *aws, int code, const char *id, size_t idlen, struct json_object *data, const char *token) -{ - json_object *msg; - const char *txt; - - /* pack the message */ - msg = json_object_new_array(); - json_object_array_add(msg, json_object_new_int(code)); - json_object_array_add(msg, json_object_new_string_len(id, (int)idlen)); - json_object_array_add(msg, data); - if (token) - json_object_array_add(msg, json_object_new_string(token)); - - /* emits the reply */ - txt = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - afb_ws_text(aws->ws, txt, strlen(txt)); - json_object_put(msg); -} - -static void wsreq_reply(struct afb_wsreq *wsreq, int retcode, const char *status, const char *info, json_object *resp) -{ - struct json_object *reply = afb_msg_json_reply(status, info, resp, &wsreq->context, NULL); - aws_emit(wsreq->aws, retcode, wsreq->id, wsreq->idlen, reply, afb_context_sent_token(&wsreq->context)); -} - static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info) { - wsreq_reply(wsreq, RETERR, status, info, NULL); + int rc; + rc = afb_wsj1_reply_error_j(wsreq->msgj1, afb_msg_json_reply_error(status, info, &wsreq->context, NULL), afb_context_sent_token(&wsreq->context)); + if (rc) + ERROR("Can't send fail reply: %m"); } static void wsreq_success(struct afb_wsreq *wsreq, json_object *obj, const char *info) { - wsreq_reply(wsreq, RETOK, "success", info, obj); + int rc; + rc = afb_wsj1_reply_ok_j(wsreq->msgj1, afb_msg_json_reply_ok(info, obj, &wsreq->context, NULL), afb_context_sent_token(&wsreq->context)); + if (rc) + ERROR("Can't send success reply: %m"); } static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size) { - *size = wsreq->objlen; - return wsreq->obj; + const char *result = afb_wsj1_msg_object_s(wsreq->msgj1); + if (size != NULL) + *size = strlen(result); + return result; } static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size) { - afb_ws_text(wsreq->aws->ws, buffer, size); + int rc; + rc = afb_wsj1_reply_ok_s(wsreq->msgj1, buffer, afb_context_sent_token(&wsreq->context)); + if (rc) + ERROR("Can't send raw reply: %m"); } static void aws_send_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object) { - aws_emit(aws, EVENT, event, strlen(event), afb_msg_json_event(event, object), NULL); + afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object)); } diff --git a/src/afb-wsj1.c b/src/afb-wsj1.c index 5859bd36..068a3326 100644 --- a/src/afb-wsj1.c +++ b/src/afb-wsj1.c @@ -424,6 +424,11 @@ struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg) return msg->wsj1; } +int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text) +{ + return afb_ws_close(wsj1->ws, code, text); +} + static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1) { char code[2] = { (char)('0' + i1), 0 }; @@ -438,7 +443,10 @@ static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object) { - return afb_wsj1_send_event_s(wsj1, event, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN)); + const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); + int rc = afb_wsj1_send_event_s(wsj1, event, objstr); + json_object_put(object); + return rc; } int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object) @@ -448,7 +456,10 @@ int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char * int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure) { - return afb_wsj1_call_s(wsj1, api, verb, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN), on_reply, closure); + const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); + int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure); + json_object_put(object); + return rc; } int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure) @@ -480,7 +491,9 @@ int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, co int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror) { const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); - return afb_wsj1_reply_s(msg, objstr, token, iserror); + int rc = afb_wsj1_reply_s(msg, objstr, token, iserror); + json_object_put(object); + return rc; } int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror) diff --git a/src/afb-wsj1.h b/src/afb-wsj1.h index ad734962..49b77828 100644 --- a/src/afb-wsj1.h +++ b/src/afb-wsj1.h @@ -66,6 +66,14 @@ extern void afb_wsj1_addref(struct afb_wsj1 *wsj1); extern void afb_wsj1_unref(struct afb_wsj1 *wsj1); /* + * Sends a close message to the websocket of 'wsj1'. + * The close message is sent with the 'code' and 'text'. + * 'text' can be NULL. + * Return 0 in case of success. Otherwise, returns -1 and set errno. + */ +extern int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text); + +/* * Sends on 'wsj1' the event of name 'event' with the * data 'object'. If not NULL, 'object' should be a valid * JSON string. @@ -76,6 +84,7 @@ extern int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const /* * Sends on 'wsj1' the event of name 'event' with the * data 'object'. 'object' can be NULL. + * 'object' is dereferenced using 'json_object_put'. Use 'json_object_get' to keep it. * Return 0 in case of success. Otherwise, returns -1 and set errno. */ extern int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object); @@ -92,6 +101,7 @@ extern int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *v /* * Sends on 'wsj1' a call to the method of 'api'/'verb' with arguments * given by 'object'. 'object' can be NULL. + * 'object' is dereferenced using 'json_object_put'. Use 'json_object_get' to keep it. * On receiving the reply, the function 'on_reply' is called with 'closure' * as its first argument and the message of the reply. * Return 0 in case of success. Otherwise, returns -1 and set errno. @@ -110,13 +120,11 @@ extern int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const * Sends for message 'msg' the reply with the 'object' and, if not NULL, the token. * When 'iserror' is zero a OK reply is send, otherwise an ERROR reply is sent. * 'object' can be NULL. + * 'object' is dereferenced using 'json_object_put'. Use 'json_object_get' to keep it. * Return 0 in case of success. Otherwise, returns -1 and set errno. */ extern int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror); - - - /* * Sends for message 'msg' the OK reply with the 'object' and, if not NULL, the token. * If not NULL, 'object' should be a valid JSON string. @@ -130,6 +138,7 @@ static inline int afb_wsj1_reply_ok_s(struct afb_wsj1_msg *msg, const char *obje /* * Sends for message 'msg' the OK reply with the 'object' and, if not NULL, the token. * 'object' can be NULL. + * 'object' is dereferenced using 'json_object_put'. Use 'json_object_get' to keep it. * Return 0 in case of success. Otherwise, returns -1 and set errno. */ static inline int afb_wsj1_reply_ok_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token) @@ -150,6 +159,7 @@ static inline int afb_wsj1_reply_error_s(struct afb_wsj1_msg *msg, const char *o /* * Sends for message 'msg' the ERROR reply with the 'object' and, if not NULL, the token. * 'object' can be NULL. + * 'object' is dereferenced using 'json_object_put'. Use 'json_object_get' to keep it. * Return 0 in case of success. Otherwise, returns -1 and set errno. */ static inline int afb_wsj1_reply_error_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token) |