summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-11-15 14:41:40 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2019-11-29 12:48:17 +0100
commit7386e1c5090b4e76036bc212f2a2cf32920bb160 (patch)
treeb4343277a870d9ba89fec11be7772f91e25dd7fe
parentb55f3cd48507105e85894be89557787eccfbe22f (diff)
afb-proto-ws: Change the protocol WSAPI
Change internals of the protocol WSAPI for the following rationale: 1. Enforce specific declaration and transmission of session identifiers and of access tokens. 2. Lower the size of identifiers to be 16 bits. 3. Introduce protocol versionning through a mechanism of offer/set. The main purpose of that change is to optimize the count of data transmitted. It manages as best as possible the transmission of access tokens the less possible times. Same for sessions that the chage was transmitted at each call. Bug-AGL: SPEC-2968 Change-Id: If0a22b86627ead35a410e51c1028025c5b02c38f Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-proto-ws.c484
-rw-r--r--src/afb-proto-ws.h40
-rw-r--r--src/afb-stub-ws.c416
-rw-r--r--src/main-afb-client-demo.c32
4 files changed, 638 insertions, 334 deletions
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 21ba2bca..ebd5fd58 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -43,7 +43,7 @@ struct afb_proto_ws;
/******** implementation of internal binder protocol per api **************/
/*
-This protocol is asymetric: there is a client and a server
+This protocol is asymmetric: there is a client and a server
The client can require the following actions:
@@ -65,21 +65,41 @@ For the purpose of handling events the server can:
- create/destroy an event
- - push or brodcast data as an event
+ - push or broadcast data as an event
*/
/************** constants for protocol definition *************************/
-#define CHAR_FOR_CALL 'C'
-#define CHAR_FOR_REPLY 'Y'
-#define CHAR_FOR_EVT_BROADCAST '*'
-#define CHAR_FOR_EVT_ADD '+'
-#define CHAR_FOR_EVT_DEL '-'
-#define CHAR_FOR_EVT_PUSH '!'
-#define CHAR_FOR_EVT_SUBSCRIBE 'S'
-#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
+#define CHAR_FOR_CALL 'K'
+#define CHAR_FOR_REPLY 'k'
+#define CHAR_FOR_EVT_BROADCAST 'B'
+#define CHAR_FOR_EVT_ADD 'E'
+#define CHAR_FOR_EVT_DEL 'e'
+#define CHAR_FOR_EVT_PUSH 'P'
+#define CHAR_FOR_EVT_SUBSCRIBE 'X'
+#define CHAR_FOR_EVT_UNSUBSCRIBE 'x'
#define CHAR_FOR_DESCRIBE 'D'
#define CHAR_FOR_DESCRIPTION 'd'
+#define CHAR_FOR_TOKEN_ADD 'T'
+#define CHAR_FOR_TOKEN_DROP 't'
+#define CHAR_FOR_SESSION_ADD 'S'
+#define CHAR_FOR_SESSION_DROP 's'
+#define CHAR_FOR_VERSION_OFFER 'V'
+#define CHAR_FOR_VERSION_SET 'v'
+
+/******************* manage versions *****************************/
+
+#define WSAPI_IDENTIFIER 02723012011 /* wsapi: 23.19.1.16.9 */
+
+#define WSAPI_VERSION_UNSET 0
+#define WSAPI_VERSION_1 1
+
+#define WSAPI_VERSION_MIN WSAPI_VERSION_1
+#define WSAPI_VERSION_MAX WSAPI_VERSION_1
+
+/******************* maximum count of ids ***********************/
+
+#define ACTIVE_ID_MAX 4095
/******************* handling calls *****************************/
@@ -88,20 +108,18 @@ For the purpose of handling events the server can:
*/
struct client_call {
struct client_call *next; /* the next call */
- struct afb_proto_ws *protows; /* the proto_ws */
void *request; /* the request closure */
- uint32_t callid; /* the message identifier */
+ uint16_t callid; /* the message identifier */
};
/*
* structure for a ws request
*/
struct afb_proto_ws_call {
- struct client_call *next; /* the next call */
struct afb_proto_ws *protows; /* the client of the request */
- uint32_t refcount; /* reference count */
- uint32_t callid; /* the incoming request callid */
char *buffer; /* the incoming buffer */
+ uint16_t refcount; /* reference count */
+ uint16_t callid; /* the incoming request callid */
};
/*
@@ -110,10 +128,9 @@ struct afb_proto_ws_call {
struct client_describe
{
struct client_describe *next;
- struct afb_proto_ws *protows;
void (*callback)(void*, struct json_object*);
void *closure;
- uint32_t descid;
+ uint16_t descid;
};
/*
@@ -122,7 +139,7 @@ struct client_describe
struct afb_proto_ws_describe
{
struct afb_proto_ws *protows;
- uint32_t descid;
+ uint16_t descid;
};
/******************* proto description for client or servers ******************/
@@ -130,7 +147,16 @@ struct afb_proto_ws_describe
struct afb_proto_ws
{
/* count of references */
- int refcount;
+ uint16_t refcount;
+
+ /* id generator */
+ uint16_t genid;
+
+ /* count actives ids */
+ uint16_t idcount;
+
+ /* version */
+ uint8_t version;
/* resource control */
pthread_mutex_t mutex;
@@ -183,19 +209,7 @@ struct binary
struct readbuf rb;
};
-/******************* common useful tools **********************************/
-
-/**
- * translate a pointer to some integer
- * @param ptr the pointer to translate
- * @return an integer
- */
-static inline uint32_t ptr2id(void *ptr)
-{
- return (uint32_t)(((intptr_t)ptr) >> 6);
-}
-
-/******************* serialisation part **********************************/
+/******************* serialization part **********************************/
static char *readbuf_get(struct readbuf *rb, uint32_t length)
{
@@ -207,24 +221,40 @@ static char *readbuf_get(struct readbuf *rb, uint32_t length)
return before;
}
-__attribute__((unused))
-static int readbuf_char(struct readbuf *rb, char *value)
+static int readbuf_getat(struct readbuf *rb, void *to, uint32_t length)
{
- if (rb->head >= rb->end)
+ char *head = readbuf_get(rb, length);
+ if (!head)
return 0;
- *value = *rb->head++;
+ memcpy(to, head, length);
return 1;
}
+__attribute__((unused))
+static int readbuf_char(struct readbuf *rb, char *value)
+{
+ return readbuf_getat(rb, value, sizeof *value);
+}
+
static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
{
- char *after = rb->head + sizeof *value;
- if (after > rb->end)
- return 0;
- memcpy(value, rb->head, sizeof *value);
- rb->head = after;
- *value = le32toh(*value);
- return 1;
+ int r = readbuf_getat(rb, value, sizeof *value);
+ if (r)
+ *value = le32toh(*value);
+ return r;
+}
+
+static int readbuf_uint16(struct readbuf *rb, uint16_t *value)
+{
+ int r = readbuf_getat(rb, value, sizeof *value);
+ if (r)
+ *value = le16toh(*value);
+ return r;
+}
+
+static int readbuf_uint8(struct readbuf *rb, uint8_t *value)
+{
+ return readbuf_getat(rb, value, sizeof *value);
}
static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok)
@@ -312,6 +342,7 @@ static int writebuf_putbuf(struct writebuf *wb, const void *value, int length)
return 1;
}
+__attribute__((unused))
static int writebuf_char(struct writebuf *wb, char value)
{
return writebuf_putbuf(wb, &value, 1);
@@ -323,6 +354,17 @@ static int writebuf_uint32(struct writebuf *wb, uint32_t value)
return writebuf_putbuf(wb, &value, (int)sizeof value);
}
+static int writebuf_uint16(struct writebuf *wb, uint16_t value)
+{
+ value = htole16(value);
+ return writebuf_putbuf(wb, &value, (int)sizeof value);
+}
+
+static int writebuf_uint8(struct writebuf *wb, uint8_t value)
+{
+ return writebuf_putbuf(wb, &value, (int)sizeof value);
+}
+
static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
{
uint32_t len = (uint32_t)++length;
@@ -392,6 +434,30 @@ static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb)
return rc;
}
+static int send_version_offer_1(struct afb_proto_ws *protows, uint8_t version)
+{
+ int rc = -1;
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+
+ if (writebuf_char(&wb, CHAR_FOR_VERSION_OFFER)
+ && writebuf_uint32(&wb, WSAPI_IDENTIFIER)
+ && writebuf_uint8(&wb, 1) /* offer one version */
+ && writebuf_uint8(&wb, version))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
+static int send_version_set(struct afb_proto_ws *protows, uint8_t version)
+{
+ int rc = -1;
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+
+ if (writebuf_char(&wb, CHAR_FOR_VERSION_SET)
+ && writebuf_uint8(&wb, version))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
/******************* ws request part for server *****************/
void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
@@ -416,7 +482,7 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_REPLY)
- && writebuf_uint32(&wb, call->callid)
+ && writebuf_uint16(&wb, call->callid)
&& writebuf_nullstring(&wb, error)
&& writebuf_nullstring(&wb, info)
&& writebuf_object(&wb, obj))
@@ -424,30 +490,28 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *
return rc;
}
-int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
+int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id)
{
int rc = -1;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
- && writebuf_uint32(&wb, call->callid)
- && writebuf_uint32(&wb, (uint32_t)event_id)
- && writebuf_string(&wb, event_name))
+ && writebuf_uint16(&wb, call->callid)
+ && writebuf_uint16(&wb, event_id))
rc = proto_write(protows, &wb);
return rc;
}
-int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
+int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id)
{
int rc = -1;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
- && writebuf_uint32(&wb, call->callid)
- && writebuf_uint32(&wb, (uint32_t)event_id)
- && writebuf_string(&wb, event_name))
+ && writebuf_uint16(&wb, call->callid)
+ && writebuf_uint16(&wb, event_id))
rc = proto_write(protows, &wb);
return rc;
}
@@ -455,7 +519,7 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
/******************* client part **********************************/
/* search a memorized call */
-static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint16_t callid)
{
struct client_call *call;
@@ -466,7 +530,7 @@ static struct client_call *client_call_search_locked(struct afb_proto_ws *protow
return call;
}
-static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint16_t callid)
{
struct client_call *result;
@@ -477,57 +541,49 @@ static struct client_call *client_call_search_unlocked(struct afb_proto_ws *prot
}
/* free and release the memorizing call */
-static void client_call_destroy(struct client_call *call)
+static void client_call_destroy(struct afb_proto_ws *protows, struct client_call *call)
{
struct client_call **prv;
- struct afb_proto_ws *protows = call->protows;
pthread_mutex_lock(&protows->mutex);
- prv = &call->protows->calls;
+ prv = &protows->calls;
while (*prv != NULL) {
if (*prv == call) {
+ protows->idcount--;
*prv = call->next;
- break;
+ pthread_mutex_unlock(&protows->mutex);
+ free(call);
+ return;
}
prv = &(*prv)->next;
}
pthread_mutex_unlock(&protows->mutex);
- free(call);
-}
-
-/* get event data from the message */
-static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
-{
- return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
}
/* get event from the message */
static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call)
{
- uint32_t callid;
+ uint16_t callid;
/* get event data from the message */
- if (!readbuf_uint32(rb, &callid)) {
+ if (!readbuf_uint16(rb, &callid))
return 0;
- }
/* get the call */
*call = client_call_search_unlocked(protows, callid);
- if (*call == NULL) {
- return 0;
- }
-
- return 1;
+ return *call != NULL;
}
/* adds an event */
static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb)
{
const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
- if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id);
+ if (protows->client_itf->on_event_create
+ && readbuf_uint16(rb, &event_id)
+ && readbuf_string(rb, &event_name, NULL))
+ protows->client_itf->on_event_create(protows->closure, event_id, event_name);
else
ERROR("Ignoring creation of event");
}
@@ -535,11 +591,10 @@ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf
/* removes an event */
static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
- if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id);
+ if (protows->client_itf->on_event_remove && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_remove(protows->closure, event_id);
else
ERROR("Ignoring deletion of event");
}
@@ -547,12 +602,11 @@ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf
/* subscribes an event */
static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct client_call *call;
- if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id);
+ if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_subscribe(protows->closure, call->request, event_id);
else
ERROR("Ignoring subscription to event");
}
@@ -560,12 +614,11 @@ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readb
/* unsubscribes an event */
static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct client_call *call;
- if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
- protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id);
+ if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
+ protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_id);
else
ERROR("Ignoring unsubscription to event");
}
@@ -574,11 +627,11 @@ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct rea
static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb)
{
const char *event_name, *uuid;
- char hop;
+ uint8_t hop;
struct json_object *object;
- if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_char(rb, &hop))
- protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, (uint8_t)hop);
+ if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_uint8(rb, &hop))
+ protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, hop);
else
ERROR("Ignoring broadcast of event");
}
@@ -586,12 +639,11 @@ static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readb
/* pushs an event */
static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb)
{
- const char *event_name;
- uint32_t event_id;
+ uint16_t event_id;
struct json_object *object;
- if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object))
- protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object);
+ if (protows->client_itf->on_event_push && readbuf_uint16(rb, &event_id) && readbuf_object(rb, &object))
+ protows->client_itf->on_event_push(protows->closure, event_id, object);
else
ERROR("Ignoring push of event");
}
@@ -610,7 +662,7 @@ static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb)
} else {
protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success");
}
- client_call_destroy(call);
+ client_call_destroy(protows, call);
}
static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb)
@@ -628,6 +680,7 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
pthread_mutex_unlock(&protows->mutex);
else {
*prv = desc->next;
+ protows->idcount--;
pthread_mutex_unlock(&protows->mutex);
if (!readbuf_object(rb, &object))
object = NULL;
@@ -637,6 +690,22 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
}
}
+/* received a version set */
+static void client_on_version_set(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint8_t version;
+
+ /* reads the descid */
+ if (readbuf_uint8(rb, &version)
+ && WSAPI_VERSION_MIN <= version
+ && version <= WSAPI_VERSION_MAX) {
+ protows->version = version;
+ return;
+ }
+ afb_proto_ws_hangup(protows);
+}
+
+
/* callback when receiving binary data */
static void client_on_binary_job(int sig, void *closure)
{
@@ -668,6 +737,9 @@ static void client_on_binary_job(int sig, void *closure)
case CHAR_FOR_DESCRIPTION: /* description */
client_on_description(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_VERSION_SET: /* set the protocol version */
+ client_on_version_set(binary->protows, &binary->rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -685,11 +757,45 @@ static void client_on_binary(void *closure, char *data, size_t size)
queue_message_processing(protows, data, size, client_on_binary_job);
}
+static int client_send_idstr_add_drop(struct afb_proto_ws *protows, char order, uint16_t id, const char *value)
+{
+ struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ int rc = -1;
+
+ if (writebuf_char(&wb, order)
+ && writebuf_uint16(&wb, id)
+ && (!value || writebuf_string(&wb, value)))
+ rc = proto_write(protows, &wb);
+ return rc;
+}
+
+int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_ADD, sessionid, sessionstr);
+}
+
+int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_DROP, sessionid, NULL);
+}
+
+int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_ADD, tokenid, tokenstr);
+
+}
+
+int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid)
+{
+ return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL);
+}
+
int afb_proto_ws_client_call(
struct afb_proto_ws *protows,
const char *verb,
struct json_object *args,
- const char *sessionid,
+ uint16_t sessionid,
+ uint16_t tokenid,
void *request,
const char *user_creds
)
@@ -697,6 +803,7 @@ int afb_proto_ws_client_call(
int rc = -1;
struct client_call *call;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ uint16_t id;
/* allocate call data */
call = malloc(sizeof *call);
@@ -708,19 +815,26 @@ int afb_proto_ws_client_call(
/* init call data */
pthread_mutex_lock(&protows->mutex);
- call->callid = ptr2id(call);
- while(client_call_search_locked(protows, call->callid) != NULL)
- call->callid++;
- call->protows = protows;
+ if (protows->idcount >= ACTIVE_ID_MAX) {
+ pthread_mutex_unlock(&protows->mutex);
+ errno = EBUSY;
+ goto clean;
+ }
+ protows->idcount++;
+ id = ++protows->genid;
+ while(!id || client_call_search_locked(protows, id) != NULL)
+ id++;
+ call->callid = protows->genid = id;
call->next = protows->calls;
protows->calls = call;
pthread_mutex_unlock(&protows->mutex);
/* creates the call message */
if (!writebuf_char(&wb, CHAR_FOR_CALL)
- || !writebuf_uint32(&wb, call->callid)
+ || !writebuf_uint16(&wb, call->callid)
|| !writebuf_string(&wb, verb)
- || !writebuf_string(&wb, sessionid)
+ || !writebuf_uint16(&wb, sessionid)
+ || !writebuf_uint16(&wb, tokenid)
|| !writebuf_object(&wb, args)
|| !writebuf_nullstring(&wb, user_creds)) {
errno = EINVAL;
@@ -733,7 +847,7 @@ int afb_proto_ws_client_call(
goto end;
clean:
- client_call_destroy(call);
+ client_call_destroy(protows, call);
end:
return rc;
}
@@ -743,6 +857,7 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
{
struct client_describe *desc, *d;
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
+ uint16_t id;
desc = malloc(sizeof *desc);
if (!desc) {
@@ -752,31 +867,40 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
/* fill in stack the description of the task */
pthread_mutex_lock(&protows->mutex);
- desc->descid = ptr2id(desc);
+ if (protows->idcount >= ACTIVE_ID_MAX) {
+ errno = EBUSY;
+ goto busy;
+ }
+ protows->idcount++;
+ id = ++protows->genid;
d = protows->describes;
while (d) {
- if (d->descid != desc->descid)
+ if (id && d->descid != id)
d = d->next;
else {
- desc->descid++;
+ id++;
d = protows->describes;
}
}
+ desc->descid = protows->genid = id;
desc->callback = callback;
desc->closure = closure;
- desc->protows = protows;
desc->next = protows->describes;
protows->describes = desc;
+ pthread_mutex_unlock(&protows->mutex);
/* send */
- if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
- && writebuf_uint32(&wb, desc->descid)
- && protows->ws != NULL
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.iovcount) >= 0) {
- pthread_mutex_unlock(&protows->mutex);
- return 0;
+ if (!writebuf_char(&wb, CHAR_FOR_DESCRIBE)
+ || !writebuf_uint16(&wb, desc->descid)) {
+ errno = EINVAL;
+ goto error2;
}
+ if (proto_write(protows, &wb) == 0)
+ return 0;
+
+error2:
+ pthread_mutex_lock(&protows->mutex);
d = protows->describes;
if (d == desc)
protows->describes = desc->next;
@@ -786,6 +910,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
if (d)
d->next = desc->next;
}
+ protows->idcount--;
+busy:
pthread_mutex_unlock(&protows->mutex);
free(desc);
error:
@@ -799,17 +925,18 @@ error:
static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
{
struct afb_proto_ws_call *call;
- const char *uuid, *verb, *user_creds;
- uint32_t callid;
+ const char *verb, *user_creds;
+ uint16_t callid, sessionid, tokenid;
size_t lenverb;
struct json_object *object;
afb_proto_ws_addref(protows);
/* reads the call message data */
- if (!readbuf_uint32(rb, &callid)
+ if (!readbuf_uint16(rb, &callid)
|| !readbuf_string(rb, &verb, &lenverb)
- || !readbuf_string(rb, &uuid, NULL)
+ || !readbuf_uint16(rb, &sessionid)
+ || !readbuf_uint16(rb, &tokenid)
|| !readbuf_object(rb, &object)
|| !readbuf_nullstring(rb, &user_creds, NULL))
goto overflow;
@@ -825,7 +952,7 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
call->buffer = rb->base;
rb->base = NULL; /* don't free the buffer */
- protows->server_itf->on_call(protows->closure, call, verb, object, uuid, user_creds);
+ protows->server_itf->on_call(protows->closure, call, verb, object, sessionid, tokenid, user_creds);
return;
out_of_memory:
@@ -858,11 +985,11 @@ int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct jso
/* on describe, propagate it to the ws service */
static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
{
- uint32_t descid;
+ uint16_t descid;
struct afb_proto_ws_describe *desc;
/* reads the descid */
- if (readbuf_uint32(rb, &descid)) {
+ if (readbuf_uint16(rb, &descid)) {
if (protows->server_itf->on_describe) {
/* create asynchronous job */
desc = malloc(sizeof *desc);
@@ -878,6 +1005,73 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
}
}
+static void server_on_session_add(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t sessionid;
+ const char *sessionstr;
+
+ if (readbuf_uint16(rb, &sessionid) && readbuf_string(rb, &sessionstr, NULL))
+ protows->server_itf->on_session_create(protows->closure, sessionid, sessionstr);
+}
+
+static void server_on_session_drop(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t sessionid;
+
+ if (readbuf_uint16(rb, &sessionid))
+ protows->server_itf->on_session_remove(protows->closure, sessionid);
+}
+
+static void server_on_token_add(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t tokenid;
+ const char *tokenstr;
+
+ if (readbuf_uint16(rb, &tokenid) && readbuf_string(rb, &tokenstr, NULL))
+ protows->server_itf->on_token_create(protows->closure, tokenid, tokenstr);
+}
+
+static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t tokenid;
+
+ if (readbuf_uint16(rb, &tokenid))
+ protows->server_itf->on_token_remove(protows->closure, tokenid);
+}
+
+/* on version offer */
+static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint8_t count;
+ uint8_t *versions;
+ uint8_t version;
+ uint8_t v;
+ uint32_t id;
+
+ /* reads the descid */
+ if (readbuf_uint32(rb, &id)
+ && id == WSAPI_IDENTIFIER
+ && readbuf_uint8(rb, &count)
+ && count > 0
+ && (versions = (uint8_t*)readbuf_get(rb, (uint32_t)count))) {
+ version = WSAPI_VERSION_UNSET;
+ while (count) {
+ v = versions[--count];
+ if (v >= WSAPI_VERSION_MIN
+ && v <= WSAPI_VERSION_MAX
+ && (version == WSAPI_VERSION_UNSET || version < v))
+ version = v;
+ }
+ if (version != WSAPI_VERSION_UNSET) {
+ if (send_version_set(protows, version) >= 0) {
+ protows->version = version;
+ return;
+ }
+ }
+ }
+ afb_proto_ws_hangup(protows);
+}
+
/* callback when receiving binary data */
static void server_on_binary_job(int sig, void *closure)
{
@@ -891,6 +1085,21 @@ static void server_on_binary_job(int sig, void *closure)
case CHAR_FOR_DESCRIBE:
server_on_describe(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_SESSION_ADD:
+ server_on_session_add(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_SESSION_DROP:
+ server_on_session_drop(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_TOKEN_ADD:
+ server_on_token_add(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_TOKEN_DROP:
+ server_on_token_drop(binary->protows, &binary->rb);
+ break;
+ case CHAR_FOR_VERSION_OFFER:
+ server_on_version_offer(binary->protows, &binary->rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
@@ -909,32 +1118,32 @@ static void server_on_binary(void *closure, char *data, size_t size)
/******************* server part: manage events **********************************/
-static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
+static int server_event_send(struct afb_proto_ws *protows, char order, uint16_t event_id, const char *event_name, struct json_object *data)
{
struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
int rc = -1;
if (writebuf_char(&wb, order)
- && writebuf_uint32(&wb, event_id)
- && writebuf_string(&wb, event_name)
+ && writebuf_uint16(&wb, event_id)
+ && (order != CHAR_FOR_EVT_ADD || writebuf_string(&wb, event_name))
&& (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data)))
rc = proto_write(protows, &wb);
return rc;
}
-int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
+int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name)
{
- return server_event_send(protows, CHAR_FOR_EVT_ADD, event_name, event_id, NULL);
+ return server_event_send(protows, CHAR_FOR_EVT_ADD, event_id, event_name, NULL);
}
-int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id)
+int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id)
{
- return server_event_send(protows, CHAR_FOR_EVT_DEL, event_name, event_id, NULL);
+ return server_event_send(protows, CHAR_FOR_EVT_DEL, event_id, NULL, NULL);
}
-int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data)
+int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data)
{
- return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data);
+ return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_id, NULL, data);
}
int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop)
@@ -949,7 +1158,7 @@ int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char
&& writebuf_string(&wb, event_name)
&& writebuf_object(&wb, data)
&& writebuf_put(&wb, uuid, 16)
- && writebuf_char(&wb, (char)(hop - 1)))
+ && writebuf_uint8(&wb, (uint8_t)(hop - 1)))
rc = proto_write(protows, &wb);
return rc;
}
@@ -971,6 +1180,7 @@ static void on_hangup(void *closure)
protows->calls = NULL;
ws = protows->ws;
protows->ws = NULL;
+ protows->idcount = 0;
pthread_mutex_unlock(&protows->mutex);
while (ncall) {
@@ -1029,6 +1239,7 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct
protows->ws = afb_ws_create(fdev, itf, protows);
if (protows->ws != NULL) {
protows->refcount = 1;
+ protows->version = WSAPI_VERSION_UNSET;
protows->closure = closure;
protows->server_itf = itfs;
protows->client_itf = itfc;
@@ -1042,7 +1253,16 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct
struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
{
- return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
+ struct afb_proto_ws *protows;
+
+ protows = afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
+ if (protows) {
+ if (send_version_offer_1(protows, WSAPI_VERSION_1) != 0) {
+ afb_proto_ws_unref(protows);
+ protows = NULL;
+ }
+ }
+ return protows;
}
struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)
diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h
index 506960ea..2dcb142b 100644
--- a/src/afb-proto-ws.h
+++ b/src/afb-proto-ws.h
@@ -20,9 +20,13 @@
/*
* Defined since version 3, the value AFB_PROTO_WS_VERSION can be used to
- * track versions of afb-proto-ws.
+ * track versions of afb-proto-ws. History:
+ *
+ * date version comment
+ * 2018/04/09 3 introduced for bindings v3
+ * 2019/11/20 4 introduced for tokens
*/
-#define AFB_PROTO_WS_VERSION 3
+#define AFB_PROTO_WS_VERSION 4
struct fdev;
struct afb_proto_ws;
@@ -37,17 +41,21 @@ struct afb_proto_ws_client_itf
void (*on_reply)(void *closure, void *request, struct json_object *obj, const char *error, const char *info);
/* can be NULL */
- void (*on_event_create)(void *closure, const char *event_name, int event_id);
- void (*on_event_remove)(void *closure, const char *event_name, int event_id);
- void (*on_event_subscribe)(void *closure, void *request, const char *event_name, int event_id);
- void (*on_event_unsubscribe)(void *closure, void *request, const char *event_name, int event_id);
- void (*on_event_push)(void *closure, const char *event_name, int event_id, struct json_object *data);
+ void (*on_event_create)(void *closure, uint16_t event_id, const char *event_name);
+ void (*on_event_remove)(void *closure, uint16_t event_id);
+ void (*on_event_subscribe)(void *closure, void *request, uint16_t event_id);
+ void (*on_event_unsubscribe)(void *closure, void *request, uint16_t event_id);
+ void (*on_event_push)(void *closure, uint16_t event_id, struct json_object *data);
void (*on_event_broadcast)(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
};
struct afb_proto_ws_server_itf
{
- void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid, const char *user_creds);
+ void (*on_session_create)(void *closure, uint16_t sessionid, const char *sessionstr);
+ void (*on_session_remove)(void *closure, uint16_t sessionid);
+ void (*on_token_create)(void *closure, uint16_t tokenid, const char *tokenstr);
+ void (*on_token_remove)(void *closure, uint16_t tokenid);
+ void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds);
void (*on_describe)(void *closure, struct afb_proto_ws_describe *describe);
};
@@ -66,12 +74,16 @@ extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangu
extern void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws *, void (*)(int,void*), void*));
-extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, const char *sessionid, void *request, const char *user_creds);
+extern int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr);
+extern int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid);
+extern int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr);
+extern int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid);
+extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, void *request, const char *user_creds);
extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure);
-extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id);
-extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id);
-extern int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data);
+extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name);
+extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id);
+extern int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data);
extern int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
extern void afb_proto_ws_call_addref(struct afb_proto_ws_call *call);
@@ -79,7 +91,7 @@ extern void afb_proto_ws_call_unref(struct afb_proto_ws_call *call);
extern int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info);
-extern int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id);
-extern int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id);
+extern int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id);
+extern int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id);
extern int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description);
diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c
index 9bd8ed63..d538fc65 100644
--- a/src/afb-stub-ws.c
+++ b/src/afb-stub-ws.c
@@ -43,13 +43,14 @@
#include "afb-context.h"
#include "afb-evt.h"
#include "afb-xreq.h"
+#include "afb-token.h"
#include "verbose.h"
#include "fdev.h"
#include "jobs.h"
+#include "u16id.h"
struct afb_stub_ws;
-
/**
* structure for a ws request: requests on server side
*/
@@ -60,17 +61,6 @@ struct server_req {
};
/**
- * structure for recording events on client side
- */
-struct client_event
-{
- struct client_event *next; /**< link to the next */
- struct afb_event_x2 *event; /**< the local event */
- int id; /**< the identifier */
- int refcount; /**< a reference count */
-};
-
-/**
* structure for jobs of describing
*/
struct server_describe
@@ -79,15 +69,6 @@ struct server_describe
struct afb_proto_ws_describe *describe;
};
-/**
- * structure for recording sessions
- */
-struct server_session
-{
- struct server_session *next;
- struct afb_session *session;
-};
-
/******************* stub description for client or servers ******************/
struct afb_stub_ws
@@ -112,12 +93,27 @@ struct afb_stub_ws
/* credentials of the client */
struct afb_cred *cred;
+
+ /* event from server */
+ struct u16id2bool *event_flags;
+
+ /* transmitted sessions */
+ struct u16id2ptr *session_proxies;
+
+ /* transmitted tokens */
+ struct u16id2ptr *token_proxies;
};
/* client side */
struct {
- /* event replica */
- struct client_event *events;
+ /* event from server */
+ struct u16id2ptr *event_proxies;
+
+ /* transmitted sessions */
+ struct u16id2bool *session_flags;
+
+ /* transmitted tokens */
+ struct u16id2bool *token_flags;
/* robustify */
struct {
@@ -173,7 +169,7 @@ static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *e
rc = afb_evt_event_x2_add_watch(wreq->stubws->listener, event);
if (rc >= 0)
- rc = afb_proto_ws_call_subscribe(wreq->call, afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event));
+ rc = afb_proto_ws_call_subscribe(wreq->call, afb_evt_event_x2_id(event));
if (rc < 0)
ERROR("error while subscribing event");
return rc;
@@ -184,7 +180,7 @@ static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2
int rc, rc2;
struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
- rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event));
+ rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event));
rc2 = afb_evt_event_x2_remove_watch(wreq->stubws->listener, event);
if (rc >= 0 && rc2 < 0)
rc = rc2;
@@ -202,33 +198,6 @@ static const struct afb_xreq_query_itf server_req_xreq_itf = {
/******************* client part **********************************/
-/* destroy all events */
-static void client_drop_all_events(struct afb_stub_ws *stubws)
-{
- struct client_event *ev, *nxt;
-
- nxt = __atomic_exchange_n(&stubws->events, NULL, __ATOMIC_RELAXED);
- while (nxt) {
- ev = nxt;
- nxt = ev->next;
- afb_evt_event_x2_unref(ev->event);
- free(ev);
- }
-}
-
-/* search the event */
-static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name)
-{
- struct client_event *ev;
-
- ev = stubws->events;
- while (ev != NULL && (ev->id != eventid || 0 != strcmp(afb_evt_event_x2_fullname(ev->event), name)))
- ev = ev->next;
-
- DEBUG("searching event %s[%d]: %s", name, eventid, ev ? "found" : "not found");
- return ev;
-}
-
static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws)
{
struct fdev *fdev;
@@ -243,12 +212,53 @@ static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws)
return proto;
}
+static int client_make_ids(struct afb_stub_ws *stubws, struct afb_proto_ws *proto, struct afb_context *context, uint16_t *sessionid, uint16_t *tokenid)
+{
+ int rc, rc2;
+ uint16_t sid, tid;
+
+ rc = 0;
+
+ /* get the session */
+ if (!context->session)
+ sid = 0;
+ else {
+ sid = afb_session_id(context->session);
+ rc2 = u16id2bool_set(&stubws->session_flags, sid, 1);
+ if (rc2 < 0)
+ rc = rc2;
+ else if (rc2 == 0)
+ rc = afb_proto_ws_client_session_create(proto, sid, afb_session_uuid(context->session));
+ }
+
+ /* get the token */
+ if (!context->token)
+ tid = 0;
+ else {
+ tid = afb_token_id(context->token);
+ rc2 = u16id2bool_set(&stubws->token_flags, tid, 1);
+ if (rc2 < 0)
+ rc = rc2;
+ else if (rc2 == 0) {
+ rc2 = afb_proto_ws_client_token_create(proto, tid, afb_token_string(context->token));
+ if (rc2 < 0)
+ rc = rc2;
+ }
+ }
+
+ *sessionid = sid;
+ *tokenid = tid;
+ return rc;
+}
+
/* on call, propagate it to the ws service */
static void client_api_call_cb(void * closure, struct afb_xreq *xreq)
{
int rc;
struct afb_stub_ws *stubws = closure;
struct afb_proto_ws *proto;
+ uint16_t sessionid;
+ uint16_t tokenid;
proto = client_get_proto(stubws);
if (proto == NULL) {
@@ -256,14 +266,18 @@ static void client_api_call_cb(void * closure, struct afb_xreq *xreq)
return;
}
- afb_xreq_unhooked_addref(xreq);
- rc = afb_proto_ws_client_call(
- proto,
- xreq->request.called_verb,
- afb_xreq_json(xreq),
- afb_session_uuid(xreq->context.session),
- xreq,
- xreq_on_behalf_cred_export(xreq));
+ rc = client_make_ids(stubws, proto, &xreq->context, &sessionid, &tokenid);
+ if (rc >= 0) {
+ afb_xreq_unhooked_addref(xreq);
+ rc = afb_proto_ws_client_call(
+ proto,
+ xreq->request.called_verb,
+ afb_xreq_json(xreq),
+ sessionid,
+ tokenid,
+ xreq,
+ xreq_on_behalf_cred_export(xreq));
+ }
if (rc < 0) {
afb_xreq_reply(xreq, NULL, "internal", "can't send message");
afb_xreq_unhooked_unref(xreq);
@@ -287,26 +301,35 @@ static void client_api_describe_cb(void * closure, void (*describecb)(void *, st
static void server_event_add_cb(void *closure, const char *event, uint16_t eventid)
{
+ int rc;
struct afb_stub_ws *stubws = closure;
- if (stubws->proto != NULL)
- afb_proto_ws_server_event_create(stubws->proto, event, eventid);
+ if (stubws->proto != NULL) {
+ rc = u16id2bool_set(&stubws->event_flags, eventid, 1);
+ if (rc == 0) {
+ rc = afb_proto_ws_server_event_create(stubws->proto, eventid, event);
+ if (rc < 0)
+ u16id2bool_set(&stubws->event_flags, eventid, 0);
+ }
+ }
}
static void server_event_remove_cb(void *closure, const char *event, uint16_t eventid)
{
struct afb_stub_ws *stubws = closure;
- if (stubws->proto != NULL)
- afb_proto_ws_server_event_remove(stubws->proto, event, eventid);
+ if (stubws->proto != NULL) {
+ if (u16id2bool_set(&stubws->event_flags, eventid, 0))
+ afb_proto_ws_server_event_remove(stubws->proto, eventid);
+ }
}
static void server_event_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object)
{
struct afb_stub_ws *stubws = closure;
- if (stubws->proto != NULL)
- afb_proto_ws_server_event_push(stubws->proto, event, eventid, object);
+ if (stubws->proto != NULL && u16id2bool_get(stubws->event_flags, eventid))
+ afb_proto_ws_server_event_push(stubws->proto, eventid, object);
json_object_put(object);
}
@@ -329,99 +352,69 @@ static void client_on_reply_cb(void *closure, void *request, struct json_object
afb_xreq_unhooked_unref(xreq);
}
-static void client_on_event_create_cb(void *closure, const char *event_name, int event_id)
+static void client_on_event_create_cb(void *closure, uint16_t event_id, const char *event_name)
{
struct afb_stub_ws *stubws = closure;
- struct client_event *ev;
-
+ struct afb_event_x2 *event;
+ int rc;
+
/* check conflicts */
- ev = client_event_search(stubws, event_id, event_name);
- if (ev != NULL) {
- __atomic_add_fetch(&ev->refcount, 1, __ATOMIC_RELAXED);
- return;
- }
-
- /* no conflict, try to add it */
- ev = malloc(sizeof *ev);
- if (ev != NULL) {
- ev->event = afb_evt_event_x2_create(event_name);
- if (ev->event != NULL) {
- ev->refcount = 1;
- ev->id = event_id;
- ev->next = stubws->events;
- stubws->events = ev;
- return;
+ event = afb_evt_event_x2_create(event_name);
+ if (event == NULL)
+ ERROR("can't create event %s, out of memory", event_name);
+ else {
+ rc = u16id2ptr_add(&stubws->event_proxies, event_id, event);
+ if (rc < 0) {
+ ERROR("can't record event %s", event_name);
+ afb_evt_event_x2_unref(event);
}
- free(ev);
}
- ERROR("can't create event %s, out of memory", event_name);
}
-static void client_on_event_remove_cb(void *closure, const char *event_name, int event_id)
+static void client_on_event_remove_cb(void *closure, uint16_t event_id)
{
struct afb_stub_ws *stubws = closure;
- struct client_event *ev, **prv;
-
- /* check conflicts */
- ev = client_event_search(stubws, event_id, event_name);
- if (ev == NULL)
- return;
-
- /* decrease the reference count */
-
- if (__atomic_sub_fetch(&ev->refcount, 1, __ATOMIC_RELAXED))
- return;
-
- /* unlinks the event */
- prv = &stubws->events;
- while (*prv != ev)
- prv = &(*prv)->next;
- *prv = ev->next;
+ struct afb_event_x2 *event;
+ int rc;
- /* destroys the event */
- afb_evt_event_x2_unref(ev->event);
- free(ev);
+ rc = u16id2ptr_drop(&stubws->event_proxies, event_id, (void**)&event);
+ if (rc == 0 && event)
+ afb_evt_event_x2_unref(event);
}
-static void client_on_event_subscribe_cb(void *closure, void *request, const char *event_name, int event_id)
+static void client_on_event_subscribe_cb(void *closure, void *request, uint16_t event_id)
{
struct afb_stub_ws *stubws = closure;
struct afb_xreq *xreq = request;
- struct client_event *ev;
-
- /* check conflicts */
- ev = client_event_search(stubws, event_id, event_name);
- if (ev == NULL)
- return;
+ struct afb_event_x2 *event;
+ int rc;
- if (afb_xreq_subscribe(xreq, ev->event) < 0)
+ rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event);
+ if (rc < 0 || !event || afb_xreq_subscribe(xreq, event) < 0)
ERROR("can't subscribe: %m");
}
-static void client_on_event_unsubscribe_cb(void *closure, void *request, const char *event_name, int event_id)
+static void client_on_event_unsubscribe_cb(void *closure, void *request, uint16_t event_id)
{
struct afb_stub_ws *stubws = closure;
struct afb_xreq *xreq = request;
- struct client_event *ev;
-
- /* check conflicts */
- ev = client_event_search(stubws, event_id, event_name);
- if (ev == NULL)
- return;
+ struct afb_event_x2 *event;
+ int rc;
- if (afb_xreq_unsubscribe(xreq, ev->event) < 0)
+ rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event);
+ if (rc < 0 || !event || afb_xreq_unsubscribe(xreq, event) < 0)
ERROR("can't unsubscribe: %m");
}
-static void client_on_event_push_cb(void *closure, const char *event_name, int event_id, struct json_object *data)
+static void client_on_event_push_cb(void *closure, uint16_t event_id, struct json_object *data)
{
struct afb_stub_ws *stubws = closure;
- struct client_event *ev;
+ struct afb_event_x2 *event;
+ int rc;
- /* check conflicts */
- ev = client_event_search(stubws, event_id, event_name);
- if (ev)
- afb_evt_event_x2_push(ev->event, data);
+ rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event);
+ if (rc >= 0 && event)
+ afb_evt_event_x2_push(event, data);
else
ERROR("unreadable push event");
}
@@ -433,55 +426,95 @@ static void client_on_event_broadcast_cb(void *closure, const char *event_name,
/*****************************************************/
-static void server_record_session(struct afb_stub_ws *stubws, struct afb_session *session)
+static struct afb_session *server_add_session(struct afb_stub_ws *stubws, uint16_t sessionid, const char *sessionstr)
{
- struct server_session *s, **prv;
+ struct afb_session *session;
+ int rc, created;
- /* search */
- prv = &stubws->sessions;
- while ((s = *prv)) {
- if (s->session == session)
- return;
- if (afb_session_is_closed(s->session)) {
- *prv = s->next;
- afb_session_unref(s->session);
- free(s);
+ session = afb_session_get(sessionstr, AFB_SESSION_TIMEOUT_DEFAULT, &created);
+ if (session == NULL)
+ ERROR("can't create session %s, out of memory", sessionstr);
+ else {
+ afb_session_set_autoclose(session, 1);
+ rc = u16id2ptr_add(&stubws->session_proxies, sessionid, session);
+ if (rc < 0) {
+ ERROR("can't record session %s", sessionstr);
+ afb_session_unref(session);
+ session = NULL;
}
- else
- prv = &s->next;
}
+ return session;
+}
- /* create */
- s = malloc(sizeof *s);
- if (s) {
- s->session = afb_session_addref(session);
- s->next = stubws->sessions;
- stubws->sessions = s;
- }
+static void server_on_session_create_cb(void *closure, uint16_t sessionid, const char *sessionstr)
+{
+ struct afb_stub_ws *stubws = closure;
+
+ server_add_session(stubws, sessionid, sessionstr);
}
-static void server_release_all_sessions(struct afb_stub_ws *stubws)
+static void server_on_session_remove_cb(void *closure, uint16_t sessionid)
{
- struct server_session *ses, *nses;
+ struct afb_stub_ws *stubws = closure;
+ struct afb_session *session;
+ int rc;
+
+ rc = u16id2ptr_drop(&stubws->event_proxies, sessionid, (void**)&session);
+ if (rc == 0 && session)
+ afb_session_unref(session);
+}
- nses = __atomic_exchange_n(&stubws->sessions, NULL, __ATOMIC_RELAXED);
- while(nses) {
- ses = nses;
- nses = ses->next;
- afb_session_unref(ses->session);
- free(ses);
+static void server_on_token_create_cb(void *closure, uint16_t tokenid, const char *tokenstr)
+{
+ struct afb_stub_ws *stubws = closure;
+ struct afb_token *token;
+ int rc;
+
+ rc = afb_token_get(&token, tokenstr);
+ if (rc < 0)
+ ERROR("can't create token %s, out of memory", tokenstr);
+ else {
+ rc = u16id2ptr_add(&stubws->token_proxies, tokenid, token);
+ if (rc < 0) {
+ ERROR("can't record token %s", tokenstr);
+ afb_token_unref(token);
+ }
}
}
-/*****************************************************/
+static void server_on_token_remove_cb(void *closure, uint16_t tokenid)
+{
+ struct afb_stub_ws *stubws = closure;
+ struct afb_token *token;
+ int rc;
+
+ rc = u16id2ptr_drop(&stubws->event_proxies, tokenid, (void**)&token);
+ if (rc == 0 && token)
+ afb_token_unref(token);
+}
-static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid, const char *user_creds)
+static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds)
{
struct afb_stub_ws *stubws = closure;
struct server_req *wreq;
+ struct afb_session *session;
+ struct afb_token *token;
+ int rc;
afb_stub_ws_addref(stubws);
+ /* get tokens and sessions */
+ rc = u16id2ptr_get(stubws->session_proxies, sessionid, (void**)&session);
+ if (rc < 0) {
+ if (sessionid != 0)
+ goto no_session;
+ session = server_add_session(stubws, sessionid, NULL);
+ if (!session)
+ goto out_of_memory;
+ }
+ if (!tokenid || u16id2ptr_get(stubws->token_proxies, tokenid, (void**)&token) < 0)
+ token = NULL;
+
/* create the request */
wreq = malloc(sizeof *wreq);
if (wreq == NULL)
@@ -492,11 +525,7 @@ static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, con
wreq->call = call;
/* init the context */
- if (afb_context_connect_validated(&wreq->xreq.context, sessionid) < 0)
- goto unconnected;
- server_record_session(stubws, wreq->xreq.context.session);
- if (wreq->xreq.context.created)
- afb_session_set_autoclose(wreq->xreq.context.session, 1);
+ afb_context_init(&wreq->xreq.context, session, token);
/* makes the call */
wreq->xreq.cred = afb_cred_mixed_on_behalf_import(stubws->cred, &wreq->xreq.context, user_creds);
@@ -506,9 +535,8 @@ static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, con
afb_xreq_process(&wreq->xreq, stubws->apiset);
return;
-unconnected:
- free(wreq);
out_of_memory:
+no_session:
json_object_put(args);
afb_stub_ws_unref(stubws);
afb_proto_ws_call_reply(call, NULL, "internal-error", NULL);
@@ -550,6 +578,10 @@ static struct afb_api_itf client_api_itf = {
static const struct afb_proto_ws_server_itf server_itf =
{
+ .on_session_create = server_on_session_create_cb,
+ .on_session_remove = server_on_session_remove_cb,
+ .on_token_create = server_on_token_create_cb,
+ .on_token_remove = server_on_token_remove_cb,
.on_call = server_on_call_cb,
.on_describe = server_on_describe_cb
};
@@ -563,21 +595,61 @@ static const struct afb_evt_itf server_event_itf = {
};
/*****************************************************/
+/*****************************************************/
+
+static void release_all_sessions_cb(void*closure, uint16_t id, void *ptr)
+{
+ struct afb_session *session = ptr;
+ afb_session_unref(session);
+}
+
+static void release_all_tokens_cb(void*closure, uint16_t id, void *ptr)
+{
+ struct afb_token *token = ptr;
+ afb_token_unref(token);
+}
+
+static void release_all_events_cb(void*closure, uint16_t id, void *ptr)
+{
+ struct afb_event_x2 *eventid = ptr;
+ afb_evt_event_x2_unref(eventid);
+}
/* disconnect */
static void disconnect(struct afb_stub_ws *stubws)
{
+ struct u16id2ptr *i2p;
+ struct u16id2bool *i2b;
+
afb_proto_ws_unref(__atomic_exchange_n(&stubws->proto, NULL, __ATOMIC_RELAXED));
- if (stubws->is_client)
- client_drop_all_events(stubws);
- else {
+ if (stubws->is_client) {
+ i2p = __atomic_exchange_n(&stubws->event_proxies, NULL, __ATOMIC_RELAXED);
+ if (i2p) {
+ u16id2ptr_forall(i2p, release_all_events_cb, NULL);
+ u16id2ptr_destroy(&i2p);
+ }
+ i2b = __atomic_exchange_n(&stubws->session_flags, NULL, __ATOMIC_RELAXED);
+ u16id2bool_destroy(&i2b);
+ i2b = __atomic_exchange_n(&stubws->token_flags, NULL, __ATOMIC_RELAXED);
+ u16id2bool_destroy(&i2b);
+ } else {
afb_evt_listener_unref(__atomic_exchange_n(&stubws->listener, NULL, __ATOMIC_RELAXED));
afb_cred_unref(__atomic_exchange_n(&stubws->cred, NULL, __ATOMIC_RELAXED));
- server_release_all_sessions(stubws);
+ i2b = __atomic_exchange_n(&stubws->event_flags, NULL, __ATOMIC_RELAXED);
+ u16id2bool_destroy(&i2b);
+ i2p = __atomic_exchange_n(&stubws->session_proxies, NULL, __ATOMIC_RELAXED);
+ if (i2p) {
+ u16id2ptr_forall(i2p, release_all_sessions_cb, NULL);
+ u16id2ptr_destroy(&i2p);
+ }
+ i2p = __atomic_exchange_n(&stubws->token_proxies, NULL, __ATOMIC_RELAXED);
+ if (i2p) {
+ u16id2ptr_forall(i2p, release_all_tokens_cb, NULL);
+ u16id2ptr_destroy(&i2p);
+ }
}
}
-
/* callback when receiving a hangup */
static void on_hangup(void *closure)
{
diff --git a/src/main-afb-client-demo.c b/src/main-afb-client-demo.c
index f2c0826e..b5877485 100644
--- a/src/main-afb-client-demo.c
+++ b/src/main-afb-client-demo.c
@@ -55,11 +55,11 @@ static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg
static void on_pws_hangup(void *closure);
static void on_pws_reply(void *closure, void *request, struct json_object *result, const char *error, const char *info);
-static void on_pws_event_create(void *closure, const char *event_name, int event_id);
-static void on_pws_event_remove(void *closure, const char *event_name, int event_id);
-static void on_pws_event_subscribe(void *closure, void *request, const char *event_name, int event_id);
-static void on_pws_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id);
-static void on_pws_event_push(void *closure, const char *event_name, int event_id, struct json_object *data);
+static void on_pws_event_create(void *closure, uint16_t event_id, const char *event_name);
+static void on_pws_event_remove(void *closure, uint16_t event_id);
+static void on_pws_event_subscribe(void *closure, void *request, uint16_t event_id);
+static void on_pws_event_unsubscribe(void *closure, void *request, uint16_t event_id);
+static void on_pws_event_push(void *closure, uint16_t event_id, struct json_object *data);
static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop);
static void idle();
@@ -497,36 +497,36 @@ static void on_pws_reply(void *closure, void *request, struct json_object *resul
dec_callcount();
}
-static void on_pws_event_create(void *closure, const char *event_name, int event_id)
+static void on_pws_event_create(void *closure, uint16_t event_id, const char *event_name)
{
printf("ON-EVENT-CREATE: [%d:%s]\n", event_id, event_name);
fflush(stdout);
}
-static void on_pws_event_remove(void *closure, const char *event_name, int event_id)
+static void on_pws_event_remove(void *closure, uint16_t event_id)
{
- printf("ON-EVENT-REMOVE: [%d:%s]\n", event_id, event_name);
+ printf("ON-EVENT-REMOVE: [%d]\n", event_id);
fflush(stdout);
}
-static void on_pws_event_subscribe(void *closure, void *request, const char *event_name, int event_id)
+static void on_pws_event_subscribe(void *closure, void *request, uint16_t event_id)
{
- printf("ON-EVENT-SUBSCRIBE %s: [%d:%s]\n", (char*)request, event_id, event_name);
+ printf("ON-EVENT-SUBSCRIBE %s: [%d]\n", (char*)request, event_id);
fflush(stdout);
}
-static void on_pws_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id)
+static void on_pws_event_unsubscribe(void *closure, void *request, uint16_t event_id)
{
- printf("ON-EVENT-UNSUBSCRIBE %s: [%d:%s]\n", (char*)request, event_id, event_name);
+ printf("ON-EVENT-UNSUBSCRIBE %s: [%d]\n", (char*)request, event_id);
fflush(stdout);
}
-static void on_pws_event_push(void *closure, const char *event_name, int event_id, struct json_object *data)
+static void on_pws_event_push(void *closure, uint16_t event_id, struct json_object *data)
{
if (raw)
- printf("ON-EVENT-PUSH: [%d:%s]\n%s\n", event_id, event_name, json_object_to_json_string_ext(data, JSON_C_TO_STRING_NOSLASHESCAPE));
+ printf("ON-EVENT-PUSH: [%d]\n%s\n", event_id, json_object_to_json_string_ext(data, JSON_C_TO_STRING_NOSLASHESCAPE));
if (human)
- printf("ON-EVENT-PUSH: [%d:%s]\n%s\n", event_id, event_name, json_object_to_json_string_ext(data, JSON_C_TO_STRING_PRETTY|JSON_C_TO_STRING_NOSLASHESCAPE));
+ printf("ON-EVENT-PUSH: [%d]\n%s\n", event_id, json_object_to_json_string_ext(data, JSON_C_TO_STRING_PRETTY|JSON_C_TO_STRING_NOSLASHESCAPE));
fflush(stdout);
}
@@ -564,7 +564,7 @@ static void pws_call(const char *verb, const char *object)
if (jerr != json_tokener_success)
o = json_object_new_string(object);
}
- rc = afb_proto_ws_client_call(pws, verb, o, sessionid, key, NULL);
+ rc = afb_proto_ws_client_call(pws, verb, o, 0, 0, key, NULL);
json_object_put(o);
if (rc < 0) {
fprintf(stderr, "calling %s(%s) failed: %m\n", verb, object?:"");