aboutsummaryrefslogtreecommitdiffstats
path: root/src/afb-proto-ws.c
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-11-15 14:41:40 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2019-11-29 12:48:17 +0100
commit7386e1c5090b4e76036bc212f2a2cf32920bb160 (patch)
treeb4343277a870d9ba89fec11be7772f91e25dd7fe /src/afb-proto-ws.c
parentb55f3cd48507105e85894be89557787eccfbe22f (diff)
afb-proto-ws: Change the protocol WSAPI
Change internals of the protocol WSAPI for the following rationale: 1. Enforce specific declaration and transmission of session identifiers and of access tokens. 2. Lower the size of identifiers to be 16 bits. 3. Introduce protocol versionning through a mechanism of offer/set. The main purpose of that change is to optimize the count of data transmitted. It manages as best as possible the transmission of access tokens the less possible times. Same for sessions that the chage was transmitted at each call. Bug-AGL: SPEC-2968 Change-Id: If0a22b86627ead35a410e51c1028025c5b02c38f Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
Diffstat (limited to 'src/afb-proto-ws.c')
-rw-r--r--src/afb-proto-ws.c484
1 files changed, 352 insertions, 132 deletions
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 21ba2bca..ebd5fd58 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -43,7 +43,7 @@ struct afb_proto_ws;
/******** implementation of internal binder protocol per api **************/
/*
-This protocol is asymetric: there is a client and a server
+This protocol is asymmetric: there is a client and a server
The client can require the following actions:
@@ -65,21 +65,41 @@ For the purpose of handling events the server can:
- create/destroy an event
- - push or brodcast data as an event
+ - push or broadcast data as an event
*/
/************** constants for protocol definition *************************/
-#define CHAR_FOR_CALL 'C'
-#define CHAR_FOR_REPLY 'Y'
-#define CHAR_FOR_EVT_BROADCAST '*'
-#define CHAR_FOR_EVT_ADD '+'
-#define CHAR_FOR_EVT_DEL '-'
-#define CHAR_FOR_EVT_PUSH '!'
-#define CHAR_FOR_EVT_SUBSCRIBE 'S'
-#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
+#define CHAR_FOR_CALL 'K'
+#define CHAR_FOR_REPLY 'k'
+#define CHAR_FOR_EVT_BROADCAST 'B'
+#define CHAR_FOR_EVT_ADD 'E'
+#define CHAR_FOR_EVT_DEL 'e'
+#define CHAR_FOR_EVT_PUSH 'P'
+#define CHAR_FOR_EVT_SUBSCRIBE 'X'
+#define CHAR_FOR_EVT_UNSUBSCRIBE 'x'
#define CHAR_FOR_DESCRIBE 'D'
#define CHAR_FOR_DESCRIPTION 'd'
+#define CHAR_FOR_TOKEN_ADD 'T'
+#define CHAR_FOR_TOKEN_DROP 't'
+#define CHAR_FOR_SESSION_ADD 'S'
+#define CHAR_FOR_SESSION_DROP 's'
+#define CHAR_FOR_VERSION_OFFER 'V'
+#define CHAR_FOR_VERSION_SET 'v'
+
+/******************* manage versions *****************************/
+
+#define WSAPI_IDENTIFIER 02723012011 /* wsapi: 23.19.1.16.9 */
+
+#define WSAPI_VERSION_UNSET 0
+#define WSAPI_VERSION_1 1
+
+#define WSAPI_VERSION_MIN WSAPI_VERSION_1
+#define WSAPI_VERSION_MAX WSAPI_VERSION_1
+
+/******************* maximum count of ids ***********************/
+
+#define ACTIVE_ID_MAX 4095
/******************* handling calls *****************************/
@@ -88,20 +108,18 @@ For the purpose of handling events the server can:
*/
struct client_call {
struct client_call *next; /* the next call */
- struct afb_proto_ws *protows; /* the proto_ws */
void *request; /* the request closure */
- uint32_t callid; /* the message identifier */
+ uint16_t callid; /* the message identifier */
};
/*
* structure for a ws request
*/
struct afb_proto_ws_call {
- struct client_call *next; /* the next call */
struct afb_proto_ws *protows; /* the client of the request */
- uint32_t refcount; /* reference count */
- uint32_t callid; /* the incoming request callid */
char *buffer; /* the incoming buffer */
+ uint16_t refcount; /* reference count */
+ uint16_t callid; /* the incoming request callid */
};
/*
@@ -110,10 +128,9 @@ struct afb_proto_ws_call {
struct client_describe
{
struct client_describe *next;
- struct afb_proto_ws *protows;
void (*callback)(void*, struct json_object*);
void *closure;
- uint32_t descid;
+ uint16_t descid;
};
/*
@@ -122,7 +139,7 @@ struct client_describe
struct afb_proto_ws_describe
{
struct afb_proto_ws *protows;
- uint32_t descid;
+ uint16_t descid;
};
/******************* proto description for client or servers ******************/
@@ -130,7 +147,16 @@ struct afb_proto_ws_describe
struct afb_proto_ws
{
/* count of references */
- int refcount;
+ uint16_t refcount;
+
+ /* id generator */
+ uint16_t genid;
+
+ /* count actives ids */
+ uint16_t idcount;
+
+ /* version */
+ uint8_t version;
/* resource control */
pthread_mutex_t mutex;
@@ -183,19 +209,7 @@ struct binary
struct readbuf rb;
};
-/******************* common useful tools **********************************/
-
-/**
- * translate a pointer to some integer
- * @param ptr the pointer to translate
- * @return an integer
- */
-static inline uint32_t ptr2id(void *ptr)
-{
- return (uint32_t)(((intptr_t)ptr) >> 6);
-}
-
-/******************* serialisation part **********************************/
+/******************* serialization part **********************************/
static char *readbuf_get(struct readbuf *rb, uint32_t length)
{
@@ -207,24 +221,40 @@ static char *readbuf_get(struct readbuf *rb, uint32_t length)
return before;
}
-__attribute__((unused))
-static int readbuf_char(struct readbuf *rb, char *value)
+static int readbuf_getat(struct readbuf *rb, void *to, uint32_t length)
{
- if (rb->head >= rb->end)
+ char *head = readbuf_get(rb, length);
+ if (!head)
return 0;
- *value = *rb->head++;
+ memcpy(to, head, length);
return 1;
}
+__attribute__((unused))
+static int readbuf_char(struct readbuf *rb, char *value)
+{
+ return readbuf_getat(rb, value, sizeof *value);
+}
+
static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
{
- char *after = rb->head + sizeof *value;
- if (after > rb->end)
- return 0;
- memcpy(value, rb->head, sizeof *value);
- rb->head = after;
- *value = le32toh(*value);
- return 1;
+ int r = readbuf_getat(rb, value, sizeof *value);
+ if (r)
+ *value = le32toh(*value);
+ return r;
+}
+
+static int readbuf_uint16(struct readbuf *rb, uint16_t *value)
+{
+ int r = readbuf_getat(rb, value, sizeof *value);
+ if (r)
+ *value = le16toh(*value);
+ return r;
+}
+
+static int readbuf_uint8(struct readbuf *rb, uint8_t *value)
+{
+ return readbuf_getat(rb, value, sizeof *value);
}
static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok)
@@ -312,6 +342,7 @@ static int writebuf_putbuf(struct writebuf *wb, const void *value, int length)
return 1;
}
+__attribute__((unused))
static int writebuf_char(struct writebuf *wb, char value)
{
return writebuf_putbuf(wb, &value, 1);
@@ -323,6 +354,17 @@ static int writebuf_uint32(struct writebuf *wb, uint32_t value)
return writebuf_putbuf(wb, &value, (int)sizeof value);
}
+static int writebuf_uint16(struct writebuf *wb, uint16_t value)
+{
+ value = htole16(value);
+ return writebuf_putbuf(wb, &value, (int)sizeof value);
+}
+
+static int writebuf_uint8(struct writebuf *wb, uint8_t value)
+{
+ return writebuf_putbuf(wb, &value, (int)sizeof value);
+}
+
static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
{
uint32_t len = (uint32_t)++length;
@@ -392,6 +434,30 @@ static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb)
return rc;
}
+static int send_version_offer_1(struct afb_proto_ws *protows, uint8_t version)
+{
+ int rc = -1;
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+
+ if (writebuf_char(&wb, CHAR_FOR_VERSION_OFFER)
+ && writebuf_uint32(&wb, WSAPI_IDENTIFIER)
+ && writebuf_uint8(&wb, 1) /* offer one version */
+ && writebuf_uint8(&wb, version))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
+static int send_version_set(struct afb_proto_ws *protows, uint8_t version)
+{
+ int rc = -1;
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+
+ if (writebuf_char(&wb, CHAR_FOR_VERSION_SET)
+ && writebuf_uint8(&wb, version))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
/******************* ws request part for server *****************/
void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
@@ -416,7 +482,7 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_REPLY)
- && writebuf_uint32(&wb, call->callid)
+ && writebuf_uint16(&wb, call->callid)
&& writebuf_nullstring(&wb, error)
&& writebuf_nullstring(&wb, info)
&& writebuf_object(&wb, obj))
@@ -424,30 +490,28 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *
return rc;
}
-int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
+int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id)
{
int rc = -1;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
- && writebuf_uint32(&wb, call->callid)
- && writebuf_uint32(&wb, (uint32_t)event_id)
- && writebuf_string(&wb, event_name))
+ && writebuf_uint16(&wb, call->callid)
+ && writebuf_uint16(&wb, event_id))
rc = proto_write(protows, &wb);
return rc;
}
-int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
+int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id)
{
int rc = -1;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
- && writebuf_uint32(&wb, call->callid)
- && writebuf_uint32(&wb, (uint32_t)event_id)
- && writebuf_string(&wb, event_name))
+ && writebuf_uint16(&wb, call->callid)
+ && writebuf_uint16(&wb, event_id))
rc = proto_write(protows, &wb);
return rc;
}
@@ -455,7 +519,7 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
/******************* client part **********************************/
/* search a memorized call */
-static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint16_t callid)
{
struct client_call *call;
@@ -466,7 +530,7 @@ static struct client_call *client_call_search_locked(struct afb_proto_ws *protow
return call;
}
-static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint16_t callid)
{
struct client_call *result;
@@ -477,57 +541,49 @@ static struct client_call *client_call_search_unlocked(struct afb_proto_ws *prot
}
/* free and release the memorizing call */
-static void client_call_destroy(struct client_call *call)
+static void client_call_destroy(struct afb_proto_ws *protows, struct client_call *call)
{
struct client_call **prv;
- struct afb_proto_ws *protows = call->protows;
pthread_mutex_lock(&protows->mutex);
- prv = &call->protows->calls;
+ prv = &protows->calls;
while (*prv != NULL) {
if (*prv == call) {
+ protows->idcount--;
*prv = call->next;
- break;
+ pthread_mutex_unlock(&protows->mutex);
+ free(call);
+ return;
}
prv = &(*prv)->next;
}
pthread_mutex_unlock(&protows->mutex);
- free(call);
-}
-
-/* get event data from the message */
-static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
-{
- return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
}
/* get event from the message */
static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call)
{
- uint32_t callid;
+ uint16_t callid;
/* get event data from the message */
- if (!readbuf_uint32(rb, &callid)) {
+ if (!readbuf_uint16(rb, &callid))
return 0;
- }
/* get the call */
*call = client_call_search_unlocked(protows, callid);
- if (*call == NULL) {
- return 0;
- }
-
- return 1;
+ return *call != NULL;
}
/* adds an event */
static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb)
{
const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
- if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id);
+ if (protows->client_itf->on_event_create
+ && readbuf_uint16(rb, &event_id)
+ && readbuf_string(rb, &event_name, NULL))
+ protows->client_itf->on_event_create(protows->closure, event_id, event_name);
else
ERROR("Ignoring creation of event");
}
@@ -535,11 +591,10 @@ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf
/* removes an event */
static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
- if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id);
+ if (protows->client_itf->on_event_remove && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_remove(protows->closure, event_id);
else
ERROR("Ignoring deletion of event");
}
@@ -547,12 +602,11 @@ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf
/* subscribes an event */
static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct client_call *call;
- if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id);
+ if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_subscribe(protows->closure, call->request, event_id);
else
ERROR("Ignoring subscription to event");
}
@@ -560,12 +614,11 @@ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readb
/* unsubscribes an event */
static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct client_call *call;
- if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id);
+ if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_id);
else
ERROR("Ignoring unsubscription to event");
}
@@ -574,11 +627,11 @@ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct rea
static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb)
{
const char *event_name, *uuid;
- char hop;
+ uint8_t hop;
struct json_object *object;
- if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_char(rb, &hop))
- protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, (uint8_t)hop);
+ if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_uint8(rb, &hop))
+ protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, hop);
else
ERROR("Ignoring broadcast of event");
}
@@ -586,12 +639,11 @@ static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readb
/* pushs an event */
static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct json_object *object;
- if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object))
- protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object);
+ if (protows->client_itf->on_event_push && readbuf_uint16(rb, &event_id) && readbuf_object(rb, &object))
+ protows->client_itf->on_event_push(protows->closure, event_id, object);
else
ERROR("Ignoring push of event");
}
@@ -610,7 +662,7 @@ static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb)
} else {
protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success");
}
- client_call_destroy(call);
+ client_call_destroy(protows, call);
}
static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb)
@@ -628,6 +680,7 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
pthread_mutex_unlock(&protows->mutex);
else {
*prv = desc->next;
+ protows->idcount--;
pthread_mutex_unlock(&protows->mutex);
if (!readbuf_object(rb, &object))
object = NULL;
@@ -637,6 +690,22 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
}
}
+/* received a version set */
+static void client_on_version_set(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint8_t version;
+
+ /* reads the descid */
+ if (readbuf_uint8(rb, &version)
+ && WSAPI_VERSION_MIN <= version
+ && version <= WSAPI_VERSION_MAX) {
+ protows->version = version;
+ return;
+ }
+ afb_proto_ws_hangup(protows);
+}
+
+
/* callback when receiving binary data */
static void client_on_binary_job(int sig, void *closure)
{
@@ -668,6 +737,9 @@ static void client_on_binary_job(int sig, void *closure)
case CHAR_FOR_DESCRIPTION: /* description */
client_on_description(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_VERSION_SET: /* set the protocol version */
+ client_on_version_set(binary->protows, &binary->rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -685,11 +757,45 @@ static void client_on_binary(void *closure, char *data, size_t size)
queue_message_processing(protows, data, size, client_on_binary_job);
}
+static int client_send_idstr_add_drop(struct afb_proto_ws *protows, char order, uint16_t id, const char *value)
+{
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ int rc = -1;
+
+ if (writebuf_char(&wb, order)
+ && writebuf_uint16(&wb, id)
+ && (!value || writebuf_string(&wb, value)))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
+int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_ADD, sessionid, sessionstr);
+}
+
+int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_DROP, sessionid, NULL);
+}
+
+int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_ADD, tokenid, tokenstr);
+
+}
+
+int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL);
+}
+
int afb_proto_ws_client_call(
struct afb_proto_ws *protows,
const char *verb,
struct json_object *args,
- const char *sessionid,
+ uint16_t sessionid,
+ uint16_t tokenid,
void *request,
const char *user_creds
)
@@ -697,6 +803,7 @@ int afb_proto_ws_client_call(
int rc = -1;
struct client_call *call;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ uint16_t id;
/* allocate call data */
call = malloc(sizeof *call);
@@ -708,19 +815,26 @@ int afb_proto_ws_client_call(
/* init call data */
pthread_mutex_lock(&protows->mutex);
- call->callid = ptr2id(call);
- while(client_call_search_locked(protows, call->callid) != NULL)
- call->callid++;
- call->protows = protows;
+ if (protows->idcount >= ACTIVE_ID_MAX) {
+ pthread_mutex_unlock(&protows->mutex);
+ errno = EBUSY;
+ goto clean;
+ }
+ protows->idcount++;
+ id = ++protows->genid;
+ while(!id || client_call_search_locked(protows, id) != NULL)
+ id++;
+ call->callid = protows->genid = id;
call->next = protows->calls;
protows->calls = call;
pthread_mutex_unlock(&protows->mutex);
/* creates the call message */
if (!writebuf_char(&wb, CHAR_FOR_CALL)
- || !writebuf_uint32(&wb, call->callid)
+ || !writebuf_uint16(&wb, call->callid)
|| !writebuf_string(&wb, verb)
- || !writebuf_string(&wb, sessionid)
+ || !writebuf_uint16(&wb, sessionid)
+ || !writebuf_uint16(&wb, tokenid)
|| !writebuf_object(&wb, args)
|| !writebuf_nullstring(&wb, user_creds)) {
errno = EINVAL;
@@ -733,7 +847,7 @@ int afb_proto_ws_client_call(
goto end;
clean:
- client_call_destroy(call);
+ client_call_destroy(protows, call);
end:
return rc;
}
@@ -743,6 +857,7 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
{
struct client_describe *desc, *d;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ uint16_t id;
desc = malloc(sizeof *desc);
if (!desc) {
@@ -752,31 +867,40 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
/* fill in stack the description of the task */
pthread_mutex_lock(&protows->mutex);
- desc->descid = ptr2id(desc);
+ if (protows->idcount >= ACTIVE_ID_MAX) {
+ errno = EBUSY;
+ goto busy;
+ }
+ protows->idcount++;
+ id = ++protows->genid;
d = protows->describes;
while (d) {
- if (d->descid != desc->descid)
+ if (id && d->descid != id)
d = d->next;
else {
- desc->descid++;
+ id++;
d = protows->describes;
}
}
+ desc->descid = protows->genid = id;
desc->callback = callback;
desc->closure = closure;
- desc->protows = protows;
desc->next = protows->describes;
protows->describes = desc;
+ pthread_mutex_unlock(&protows->mutex);
/* send */
- if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
- && writebuf_uint32(&wb, desc->descid)
- && protows->ws != NULL
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.iovcount) >= 0) {
- pthread_mutex_unlock(&protows->mutex);
- return 0;
+ if (!writebuf_char(&wb, CHAR_FOR_DESCRIBE)
+ || !writebuf_uint16(&wb, desc->descid)) {
+ errno = EINVAL;
+ goto error2;
}
+ if (proto_write(protows, &wb) == 0)
+ return 0;
+
+error2:
+ pthread_mutex_lock(&protows->mutex);
d = protows->describes;
if (d == desc)
protows->describes = desc->next;
@@ -786,6 +910,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
if (d)
d->next = desc->next;
}
+ protows->idcount--;
+busy:
pthread_mutex_unlock(&protows->mutex);
free(desc);
error:
@@ -799,17 +925,18 @@ error:
static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
{
struct afb_proto_ws_call *call;
- const char *uuid, *verb, *user_creds;
- uint32_t callid;
+ const char *verb, *user_creds;
+ uint16_t callid, sessionid, tokenid;
size_t lenverb;
struct json_object *object;
afb_proto_ws_addref(protows);
/* reads the call message data */
- if (!readbuf_uint32(rb, &callid)
+ if (!readbuf_uint16(rb, &callid)
|| !readbuf_string(rb, &verb, &lenverb)
- || !readbuf_string(rb, &uuid, NULL)
+ || !readbuf_uint16(rb, &sessionid)
+ || !readbuf_uint16(rb, &tokenid)
|| !readbuf_object(rb, &object)
|| !readbuf_nullstring(rb, &user_creds, NULL))
goto overflow;
@@ -825,7 +952,7 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
call->buffer = rb->base;
rb->base = NULL; /* don't free the buffer */
- protows->server_itf->on_call(protows->closure, call, verb, object, uuid, user_creds);
+ protows->server_itf->on_call(protows->closure, call, verb, object, sessionid, tokenid, user_creds);
return;
out_of_memory:
@@ -858,11 +985,11 @@ int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct jso
/* on describe, propagate it to the ws service */
static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- uint32_t descid;
+ uint16_t descid;
struct afb_proto_ws_describe *desc;
/* reads the descid */
- if (readbuf_uint32(rb, &descid)) {
+ if (readbuf_uint16(rb, &descid)) {
if (protows->server_itf->on_describe) {
/* create asynchronous job */
desc = malloc(sizeof *desc);
@@ -878,6 +1005,73 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
}
}
+static void server_on_session_add(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t sessionid;
+ const char *sessionstr;
+
+ if (readbuf_uint16(rb, &sessionid) && readbuf_string(rb, &sessionstr, NULL))
+ protows->server_itf->on_session_create(protows->closure, sessionid, sessionstr);
+}
+
+static void server_on_session_drop(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t sessionid;
+
+ if (readbuf_uint16(rb, &sessionid))
+ protows->server_itf->on_session_remove(protows->closure, sessionid);
+}
+
+static void server_on_token_add(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t tokenid;
+ const char *tokenstr;
+
+ if (readbuf_uint16(rb, &tokenid) && readbuf_string(rb, &tokenstr, NULL))
+ protows->server_itf->on_token_create(protows->closure, tokenid, tokenstr);
+}
+
+static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t tokenid;
+
+ if (readbuf_uint16(rb, &tokenid))
+ protows->server_itf->on_token_remove(protows->closure, tokenid);
+}
+
+/* on version offer */
+static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint8_t count;
+ uint8_t *versions;
+ uint8_t version;
+ uint8_t v;
+ uint32_t id;
+
+ /* reads the descid */
+ if (readbuf_uint32(rb, &id)
+ && id == WSAPI_IDENTIFIER
+ && readbuf_uint8(rb, &count)
+ && count > 0
+ && (versions = (uint8_t*)readbuf_get(rb, (uint32_t)count))) {
+ version = WSAPI_VERSION_UNSET;
+ while (count) {
+ v = versions[--count];
+ if (v >= WSAPI_VERSION_MIN
+ && v <= WSAPI_VERSION_MAX
+ && (version == WSAPI_VERSION_UNSET || version < v))
+ version = v;
+ }
+ if (version != WSAPI_VERSION_UNSET) {
+ if (send_version_set(protows, version) >= 0) {
+ protows->version = version;
+ return;
+ }
+ }
+ }
+ afb_proto_ws_hangup(protows);
+}
+
/* callback when receiving binary data */
static void server_on_binary_job(int sig, void *closure)
{
@@ -891,6 +1085,21 @@ static void server_on_binary_job(int sig, void *closure)
case CHAR_FOR_DESCRIBE:
server_on_describe(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_SESSION_ADD:
+ server_on_session_add(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_SESSION_DROP:
+ server_on_session_drop(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_TOKEN_ADD:
+ server_on_token_add(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_TOKEN_DROP:
+ server_on_token_drop(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_VERSION_OFFER:
+ server_on_version_offer(binary->protows, &binary->rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -909,32 +1118,32 @@ static void server_on_binary(void *closure, char *data, size_t size)
/******************* server part: manage events **********************************/
-static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
+static int server_event_send(struct afb_proto_ws *protows, char order, uint16_t event_id, const char *event_name, struct json_object *data)
{
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
int rc = -1;
if (writebuf_char(&wb, order)
- && writebuf_uint32(&wb, event_id)
- && writebuf_string(&wb, event_name)
+ && writebuf_uint16(&wb, event_id)
+ && (order != CHAR_FOR_EVT_ADD || writebuf_string(&wb, event_name))
&& (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data)))
rc = proto_write(protows, &wb);
return rc;
}
-int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
+int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name)
{
- return server_event_send(protows, CHAR_FOR_EVT_ADD, event_name, event_id, NULL);
+ return server_event_send(protows, CHAR_FOR_EVT_ADD, event_id, event_name, NULL);
}
-int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id)
+int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id)
{
- return server_event_send(protows, CHAR_FOR_EVT_DEL, event_name, event_id, NULL);
+ return server_event_send(protows, CHAR_FOR_EVT_DEL, event_id, NULL, NULL);
}
-int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data)
+int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data)
{
- return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data);
+ return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_id, NULL, data);
}
int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop)
@@ -949,7 +1158,7 @@ int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char
&& writebuf_string(&wb, event_name)
&& writebuf_object(&wb, data)
&& writebuf_put(&wb, uuid, 16)
- && writebuf_char(&wb, (char)(hop - 1)))
+ && writebuf_uint8(&wb, (uint8_t)(hop - 1)))
rc = proto_write(protows, &wb);
return rc;
}
@@ -971,6 +1180,7 @@ static void on_hangup(void *closure)
protows->calls = NULL;
ws = protows->ws;
protows->ws = NULL;
+ protows->idcount = 0;
pthread_mutex_unlock(&protows->mutex);
while (ncall) {
@@ -1029,6 +1239,7 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct
protows->ws = afb_ws_create(fdev, itf, protows);
if (protows->ws != NULL) {
protows->refcount = 1;
+ protows->version = WSAPI_VERSION_UNSET;
protows->closure = closure;
protows->server_itf = itfs;
protows->client_itf = itfc;
@@ -1042,7 +1253,16 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct
struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
{
- return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
+ struct afb_proto_ws *protows;
+
+ protows = afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
+ if (protows) {
+ if (send_version_offer_1(protows, WSAPI_VERSION_1) != 0) {
+ afb_proto_ws_unref(protows);
+ protows = NULL;
+ }
+ }
+ return protows;
}
struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)