diff options
-rw-r--r-- | src/afb-proto-ws.c | 484 | ||||
-rw-r--r-- | src/afb-proto-ws.h | 40 | ||||
-rw-r--r-- | src/afb-stub-ws.c | 416 | ||||
-rw-r--r-- | src/main-afb-client-demo.c | 32 |
4 files changed, 638 insertions, 334 deletions
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 21ba2bca..ebd5fd58 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -43,7 +43,7 @@ struct afb_proto_ws; /******** implementation of internal binder protocol per api **************/ /* -This protocol is asymetric: there is a client and a server +This protocol is asymmetric: there is a client and a server The client can require the following actions: @@ -65,21 +65,41 @@ For the purpose of handling events the server can: - create/destroy an event - - push or brodcast data as an event + - push or broadcast data as an event */ /************** constants for protocol definition *************************/ -#define CHAR_FOR_CALL 'C' -#define CHAR_FOR_REPLY 'Y' -#define CHAR_FOR_EVT_BROADCAST '*' -#define CHAR_FOR_EVT_ADD '+' -#define CHAR_FOR_EVT_DEL '-' -#define CHAR_FOR_EVT_PUSH '!' -#define CHAR_FOR_EVT_SUBSCRIBE 'S' -#define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_CALL 'K' +#define CHAR_FOR_REPLY 'k' +#define CHAR_FOR_EVT_BROADCAST 'B' +#define CHAR_FOR_EVT_ADD 'E' +#define CHAR_FOR_EVT_DEL 'e' +#define CHAR_FOR_EVT_PUSH 'P' +#define CHAR_FOR_EVT_SUBSCRIBE 'X' +#define CHAR_FOR_EVT_UNSUBSCRIBE 'x' #define CHAR_FOR_DESCRIBE 'D' #define CHAR_FOR_DESCRIPTION 'd' +#define CHAR_FOR_TOKEN_ADD 'T' +#define CHAR_FOR_TOKEN_DROP 't' +#define CHAR_FOR_SESSION_ADD 'S' +#define CHAR_FOR_SESSION_DROP 's' +#define CHAR_FOR_VERSION_OFFER 'V' +#define CHAR_FOR_VERSION_SET 'v' + +/******************* manage versions *****************************/ + +#define WSAPI_IDENTIFIER 02723012011 /* wsapi: 23.19.1.16.9 */ + +#define WSAPI_VERSION_UNSET 0 +#define WSAPI_VERSION_1 1 + +#define WSAPI_VERSION_MIN WSAPI_VERSION_1 +#define WSAPI_VERSION_MAX WSAPI_VERSION_1 + +/******************* maximum count of ids ***********************/ + +#define ACTIVE_ID_MAX 4095 /******************* handling calls *****************************/ @@ -88,20 +108,18 @@ For the purpose of handling events the server can: */ struct client_call { struct client_call *next; /* the next call */ - struct afb_proto_ws *protows; /* the proto_ws */ void *request; /* the request closure */ - uint32_t callid; /* the message identifier */ + uint16_t callid; /* the message identifier */ }; /* * structure for a ws request */ struct afb_proto_ws_call { - struct client_call *next; /* the next call */ struct afb_proto_ws *protows; /* the client of the request */ - uint32_t refcount; /* reference count */ - uint32_t callid; /* the incoming request callid */ char *buffer; /* the incoming buffer */ + uint16_t refcount; /* reference count */ + uint16_t callid; /* the incoming request callid */ }; /* @@ -110,10 +128,9 @@ struct afb_proto_ws_call { struct client_describe { struct client_describe *next; - struct afb_proto_ws *protows; void (*callback)(void*, struct json_object*); void *closure; - uint32_t descid; + uint16_t descid; }; /* @@ -122,7 +139,7 @@ struct client_describe struct afb_proto_ws_describe { struct afb_proto_ws *protows; - uint32_t descid; + uint16_t descid; }; /******************* proto description for client or servers ******************/ @@ -130,7 +147,16 @@ struct afb_proto_ws_describe struct afb_proto_ws { /* count of references */ - int refcount; + uint16_t refcount; + + /* id generator */ + uint16_t genid; + + /* count actives ids */ + uint16_t idcount; + + /* version */ + uint8_t version; /* resource control */ pthread_mutex_t mutex; @@ -183,19 +209,7 @@ struct binary struct readbuf rb; }; -/******************* common useful tools **********************************/ - -/** - * translate a pointer to some integer - * @param ptr the pointer to translate - * @return an integer - */ -static inline uint32_t ptr2id(void *ptr) -{ - return (uint32_t)(((intptr_t)ptr) >> 6); -} - -/******************* serialisation part **********************************/ +/******************* serialization part **********************************/ static char *readbuf_get(struct readbuf *rb, uint32_t length) { @@ -207,24 +221,40 @@ static char *readbuf_get(struct readbuf *rb, uint32_t length) return before; } -__attribute__((unused)) -static int readbuf_char(struct readbuf *rb, char *value) +static int readbuf_getat(struct readbuf *rb, void *to, uint32_t length) { - if (rb->head >= rb->end) + char *head = readbuf_get(rb, length); + if (!head) return 0; - *value = *rb->head++; + memcpy(to, head, length); return 1; } +__attribute__((unused)) +static int readbuf_char(struct readbuf *rb, char *value) +{ + return readbuf_getat(rb, value, sizeof *value); +} + static int readbuf_uint32(struct readbuf *rb, uint32_t *value) { - char *after = rb->head + sizeof *value; - if (after > rb->end) - return 0; - memcpy(value, rb->head, sizeof *value); - rb->head = after; - *value = le32toh(*value); - return 1; + int r = readbuf_getat(rb, value, sizeof *value); + if (r) + *value = le32toh(*value); + return r; +} + +static int readbuf_uint16(struct readbuf *rb, uint16_t *value) +{ + int r = readbuf_getat(rb, value, sizeof *value); + if (r) + *value = le16toh(*value); + return r; +} + +static int readbuf_uint8(struct readbuf *rb, uint8_t *value) +{ + return readbuf_getat(rb, value, sizeof *value); } static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok) @@ -312,6 +342,7 @@ static int writebuf_putbuf(struct writebuf *wb, const void *value, int length) return 1; } +__attribute__((unused)) static int writebuf_char(struct writebuf *wb, char value) { return writebuf_putbuf(wb, &value, 1); @@ -323,6 +354,17 @@ static int writebuf_uint32(struct writebuf *wb, uint32_t value) return writebuf_putbuf(wb, &value, (int)sizeof value); } +static int writebuf_uint16(struct writebuf *wb, uint16_t value) +{ + value = htole16(value); + return writebuf_putbuf(wb, &value, (int)sizeof value); +} + +static int writebuf_uint8(struct writebuf *wb, uint8_t value) +{ + return writebuf_putbuf(wb, &value, (int)sizeof value); +} + static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length) { uint32_t len = (uint32_t)++length; @@ -392,6 +434,30 @@ static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb) return rc; } +static int send_version_offer_1(struct afb_proto_ws *protows, uint8_t version) +{ + int rc = -1; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + + if (writebuf_char(&wb, CHAR_FOR_VERSION_OFFER) + && writebuf_uint32(&wb, WSAPI_IDENTIFIER) + && writebuf_uint8(&wb, 1) /* offer one version */ + && writebuf_uint8(&wb, version)) + rc = proto_write(protows, &wb); + return rc; +} + +static int send_version_set(struct afb_proto_ws *protows, uint8_t version) +{ + int rc = -1; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + + if (writebuf_char(&wb, CHAR_FOR_VERSION_SET) + && writebuf_uint8(&wb, version)) + rc = proto_write(protows, &wb); + return rc; +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -416,7 +482,7 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object * struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_REPLY) - && writebuf_uint32(&wb, call->callid) + && writebuf_uint16(&wb, call->callid) && writebuf_nullstring(&wb, error) && writebuf_nullstring(&wb, info) && writebuf_object(&wb, obj)) @@ -424,30 +490,28 @@ int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object * return rc; } -int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) +int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id) { int rc = -1; struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) - && writebuf_uint32(&wb, call->callid) - && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) + && writebuf_uint16(&wb, call->callid) + && writebuf_uint16(&wb, event_id)) rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) +int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id) { int rc = -1; struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) - && writebuf_uint32(&wb, call->callid) - && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) + && writebuf_uint16(&wb, call->callid) + && writebuf_uint16(&wb, event_id)) rc = proto_write(protows, &wb); return rc; } @@ -455,7 +519,7 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev /******************* client part **********************************/ /* search a memorized call */ -static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint16_t callid) { struct client_call *call; @@ -466,7 +530,7 @@ static struct client_call *client_call_search_locked(struct afb_proto_ws *protow return call; } -static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint16_t callid) { struct client_call *result; @@ -477,57 +541,49 @@ static struct client_call *client_call_search_unlocked(struct afb_proto_ws *prot } /* free and release the memorizing call */ -static void client_call_destroy(struct client_call *call) +static void client_call_destroy(struct afb_proto_ws *protows, struct client_call *call) { struct client_call **prv; - struct afb_proto_ws *protows = call->protows; pthread_mutex_lock(&protows->mutex); - prv = &call->protows->calls; + prv = &protows->calls; while (*prv != NULL) { if (*prv == call) { + protows->idcount--; *prv = call->next; - break; + pthread_mutex_unlock(&protows->mutex); + free(call); + return; } prv = &(*prv)->next; } pthread_mutex_unlock(&protows->mutex); - free(call); -} - -/* get event data from the message */ -static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name) -{ - return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL); } /* get event from the message */ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call) { - uint32_t callid; + uint16_t callid; /* get event data from the message */ - if (!readbuf_uint32(rb, &callid)) { + if (!readbuf_uint16(rb, &callid)) return 0; - } /* get the call */ *call = client_call_search_unlocked(protows, callid); - if (*call == NULL) { - return 0; - } - - return 1; + return *call != NULL; } /* adds an event */ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb) { const char *event_name; - uint32_t event_id; + uint16_t event_id; - if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id); + if (protows->client_itf->on_event_create + && readbuf_uint16(rb, &event_id) + && readbuf_string(rb, &event_name, NULL)) + protows->client_itf->on_event_create(protows->closure, event_id, event_name); else ERROR("Ignoring creation of event"); } @@ -535,11 +591,10 @@ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf /* removes an event */ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; - if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id); + if (protows->client_itf->on_event_remove && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_remove(protows->closure, event_id); else ERROR("Ignoring deletion of event"); } @@ -547,12 +602,11 @@ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf /* subscribes an event */ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct client_call *call; - if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id); + if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_subscribe(protows->closure, call->request, event_id); else ERROR("Ignoring subscription to event"); } @@ -560,12 +614,11 @@ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readb /* unsubscribes an event */ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct client_call *call; - if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id); + if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_id); else ERROR("Ignoring unsubscription to event"); } @@ -574,11 +627,11 @@ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct rea static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb) { const char *event_name, *uuid; - char hop; + uint8_t hop; struct json_object *object; - if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_char(rb, &hop)) - protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, (uint8_t)hop); + if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_uint8(rb, &hop)) + protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, hop); else ERROR("Ignoring broadcast of event"); } @@ -586,12 +639,11 @@ static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readb /* pushs an event */ static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct json_object *object; - if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object)) - protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object); + if (protows->client_itf->on_event_push && readbuf_uint16(rb, &event_id) && readbuf_object(rb, &object)) + protows->client_itf->on_event_push(protows->closure, event_id, object); else ERROR("Ignoring push of event"); } @@ -610,7 +662,7 @@ static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb) } else { protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success"); } - client_call_destroy(call); + client_call_destroy(protows, call); } static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb) @@ -628,6 +680,7 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * pthread_mutex_unlock(&protows->mutex); else { *prv = desc->next; + protows->idcount--; pthread_mutex_unlock(&protows->mutex); if (!readbuf_object(rb, &object)) object = NULL; @@ -637,6 +690,22 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } } +/* received a version set */ +static void client_on_version_set(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint8_t version; + + /* reads the descid */ + if (readbuf_uint8(rb, &version) + && WSAPI_VERSION_MIN <= version + && version <= WSAPI_VERSION_MAX) { + protows->version = version; + return; + } + afb_proto_ws_hangup(protows); +} + + /* callback when receiving binary data */ static void client_on_binary_job(int sig, void *closure) { @@ -668,6 +737,9 @@ static void client_on_binary_job(int sig, void *closure) case CHAR_FOR_DESCRIPTION: /* description */ client_on_description(binary->protows, &binary->rb); break; + case CHAR_FOR_VERSION_SET: /* set the protocol version */ + client_on_version_set(binary->protows, &binary->rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -685,11 +757,45 @@ static void client_on_binary(void *closure, char *data, size_t size) queue_message_processing(protows, data, size, client_on_binary_job); } +static int client_send_idstr_add_drop(struct afb_proto_ws *protows, char order, uint16_t id, const char *value) +{ + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; + + if (writebuf_char(&wb, order) + && writebuf_uint16(&wb, id) + && (!value || writebuf_string(&wb, value))) + rc = proto_write(protows, &wb); + return rc; +} + +int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr) +{ + return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_ADD, sessionid, sessionstr); +} + +int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid) +{ + return client_send_idstr_add_drop(protows, CHAR_FOR_SESSION_DROP, sessionid, NULL); +} + +int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr) +{ + return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_ADD, tokenid, tokenstr); + +} + +int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid) +{ + return client_send_idstr_add_drop(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL); +} + int afb_proto_ws_client_call( struct afb_proto_ws *protows, const char *verb, struct json_object *args, - const char *sessionid, + uint16_t sessionid, + uint16_t tokenid, void *request, const char *user_creds ) @@ -697,6 +803,7 @@ int afb_proto_ws_client_call( int rc = -1; struct client_call *call; struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + uint16_t id; /* allocate call data */ call = malloc(sizeof *call); @@ -708,19 +815,26 @@ int afb_proto_ws_client_call( /* init call data */ pthread_mutex_lock(&protows->mutex); - call->callid = ptr2id(call); - while(client_call_search_locked(protows, call->callid) != NULL) - call->callid++; - call->protows = protows; + if (protows->idcount >= ACTIVE_ID_MAX) { + pthread_mutex_unlock(&protows->mutex); + errno = EBUSY; + goto clean; + } + protows->idcount++; + id = ++protows->genid; + while(!id || client_call_search_locked(protows, id) != NULL) + id++; + call->callid = protows->genid = id; call->next = protows->calls; protows->calls = call; pthread_mutex_unlock(&protows->mutex); /* creates the call message */ if (!writebuf_char(&wb, CHAR_FOR_CALL) - || !writebuf_uint32(&wb, call->callid) + || !writebuf_uint16(&wb, call->callid) || !writebuf_string(&wb, verb) - || !writebuf_string(&wb, sessionid) + || !writebuf_uint16(&wb, sessionid) + || !writebuf_uint16(&wb, tokenid) || !writebuf_object(&wb, args) || !writebuf_nullstring(&wb, user_creds)) { errno = EINVAL; @@ -733,7 +847,7 @@ int afb_proto_ws_client_call( goto end; clean: - client_call_destroy(call); + client_call_destroy(protows, call); end: return rc; } @@ -743,6 +857,7 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( { struct client_describe *desc, *d; struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + uint16_t id; desc = malloc(sizeof *desc); if (!desc) { @@ -752,31 +867,40 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( /* fill in stack the description of the task */ pthread_mutex_lock(&protows->mutex); - desc->descid = ptr2id(desc); + if (protows->idcount >= ACTIVE_ID_MAX) { + errno = EBUSY; + goto busy; + } + protows->idcount++; + id = ++protows->genid; d = protows->describes; while (d) { - if (d->descid != desc->descid) + if (id && d->descid != id) d = d->next; else { - desc->descid++; + id++; d = protows->describes; } } + desc->descid = protows->genid = id; desc->callback = callback; desc->closure = closure; - desc->protows = protows; desc->next = protows->describes; protows->describes = desc; + pthread_mutex_unlock(&protows->mutex); /* send */ - if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) - && writebuf_uint32(&wb, desc->descid) - && protows->ws != NULL - && afb_ws_binary_v(protows->ws, wb.iovec, wb.iovcount) >= 0) { - pthread_mutex_unlock(&protows->mutex); - return 0; + if (!writebuf_char(&wb, CHAR_FOR_DESCRIBE) + || !writebuf_uint16(&wb, desc->descid)) { + errno = EINVAL; + goto error2; } + if (proto_write(protows, &wb) == 0) + return 0; + +error2: + pthread_mutex_lock(&protows->mutex); d = protows->describes; if (d == desc) protows->describes = desc->next; @@ -786,6 +910,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( if (d) d->next = desc->next; } + protows->idcount--; +busy: pthread_mutex_unlock(&protows->mutex); free(desc); error: @@ -799,17 +925,18 @@ error: static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) { struct afb_proto_ws_call *call; - const char *uuid, *verb, *user_creds; - uint32_t callid; + const char *verb, *user_creds; + uint16_t callid, sessionid, tokenid; size_t lenverb; struct json_object *object; afb_proto_ws_addref(protows); /* reads the call message data */ - if (!readbuf_uint32(rb, &callid) + if (!readbuf_uint16(rb, &callid) || !readbuf_string(rb, &verb, &lenverb) - || !readbuf_string(rb, &uuid, NULL) + || !readbuf_uint16(rb, &sessionid) + || !readbuf_uint16(rb, &tokenid) || !readbuf_object(rb, &object) || !readbuf_nullstring(rb, &user_creds, NULL)) goto overflow; @@ -825,7 +952,7 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) call->buffer = rb->base; rb->base = NULL; /* don't free the buffer */ - protows->server_itf->on_call(protows->closure, call, verb, object, uuid, user_creds); + protows->server_itf->on_call(protows->closure, call, verb, object, sessionid, tokenid, user_creds); return; out_of_memory: @@ -858,11 +985,11 @@ int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct jso /* on describe, propagate it to the ws service */ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) { - uint32_t descid; + uint16_t descid; struct afb_proto_ws_describe *desc; /* reads the descid */ - if (readbuf_uint32(rb, &descid)) { + if (readbuf_uint16(rb, &descid)) { if (protows->server_itf->on_describe) { /* create asynchronous job */ desc = malloc(sizeof *desc); @@ -878,6 +1005,73 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } } +static void server_on_session_add(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t sessionid; + const char *sessionstr; + + if (readbuf_uint16(rb, &sessionid) && readbuf_string(rb, &sessionstr, NULL)) + protows->server_itf->on_session_create(protows->closure, sessionid, sessionstr); +} + +static void server_on_session_drop(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t sessionid; + + if (readbuf_uint16(rb, &sessionid)) + protows->server_itf->on_session_remove(protows->closure, sessionid); +} + +static void server_on_token_add(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t tokenid; + const char *tokenstr; + + if (readbuf_uint16(rb, &tokenid) && readbuf_string(rb, &tokenstr, NULL)) + protows->server_itf->on_token_create(protows->closure, tokenid, tokenstr); +} + +static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t tokenid; + + if (readbuf_uint16(rb, &tokenid)) + protows->server_itf->on_token_remove(protows->closure, tokenid); +} + +/* on version offer */ +static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint8_t count; + uint8_t *versions; + uint8_t version; + uint8_t v; + uint32_t id; + + /* reads the descid */ + if (readbuf_uint32(rb, &id) + && id == WSAPI_IDENTIFIER + && readbuf_uint8(rb, &count) + && count > 0 + && (versions = (uint8_t*)readbuf_get(rb, (uint32_t)count))) { + version = WSAPI_VERSION_UNSET; + while (count) { + v = versions[--count]; + if (v >= WSAPI_VERSION_MIN + && v <= WSAPI_VERSION_MAX + && (version == WSAPI_VERSION_UNSET || version < v)) + version = v; + } + if (version != WSAPI_VERSION_UNSET) { + if (send_version_set(protows, version) >= 0) { + protows->version = version; + return; + } + } + } + afb_proto_ws_hangup(protows); +} + /* callback when receiving binary data */ static void server_on_binary_job(int sig, void *closure) { @@ -891,6 +1085,21 @@ static void server_on_binary_job(int sig, void *closure) case CHAR_FOR_DESCRIBE: server_on_describe(binary->protows, &binary->rb); break; + case CHAR_FOR_SESSION_ADD: + server_on_session_add(binary->protows, &binary->rb); + break; + case CHAR_FOR_SESSION_DROP: + server_on_session_drop(binary->protows, &binary->rb); + break; + case CHAR_FOR_TOKEN_ADD: + server_on_token_add(binary->protows, &binary->rb); + break; + case CHAR_FOR_TOKEN_DROP: + server_on_token_drop(binary->protows, &binary->rb); + break; + case CHAR_FOR_VERSION_OFFER: + server_on_version_offer(binary->protows, &binary->rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -909,32 +1118,32 @@ static void server_on_binary(void *closure, char *data, size_t size) /******************* server part: manage events **********************************/ -static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) +static int server_event_send(struct afb_proto_ws *protows, char order, uint16_t event_id, const char *event_name, struct json_object *data) { struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; int rc = -1; if (writebuf_char(&wb, order) - && writebuf_uint32(&wb, event_id) - && writebuf_string(&wb, event_name) + && writebuf_uint16(&wb, event_id) + && (order != CHAR_FOR_EVT_ADD || writebuf_string(&wb, event_name)) && (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data))) rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) +int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name) { - return server_event_send(protows, CHAR_FOR_EVT_ADD, event_name, event_id, NULL); + return server_event_send(protows, CHAR_FOR_EVT_ADD, event_id, event_name, NULL); } -int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id) +int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id) { - return server_event_send(protows, CHAR_FOR_EVT_DEL, event_name, event_id, NULL); + return server_event_send(protows, CHAR_FOR_EVT_DEL, event_id, NULL, NULL); } -int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data) +int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data) { - return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data); + return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_id, NULL, data); } int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop) @@ -949,7 +1158,7 @@ int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char && writebuf_string(&wb, event_name) && writebuf_object(&wb, data) && writebuf_put(&wb, uuid, 16) - && writebuf_char(&wb, (char)(hop - 1))) + && writebuf_uint8(&wb, (uint8_t)(hop - 1))) rc = proto_write(protows, &wb); return rc; } @@ -971,6 +1180,7 @@ static void on_hangup(void *closure) protows->calls = NULL; ws = protows->ws; protows->ws = NULL; + protows->idcount = 0; pthread_mutex_unlock(&protows->mutex); while (ncall) { @@ -1029,6 +1239,7 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct protows->ws = afb_ws_create(fdev, itf, protows); if (protows->ws != NULL) { protows->refcount = 1; + protows->version = WSAPI_VERSION_UNSET; protows->closure = closure; protows->server_itf = itfs; protows->client_itf = itfc; @@ -1042,7 +1253,16 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure) { - return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf); + struct afb_proto_ws *protows; + + protows = afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf); + if (protows) { + if (send_version_offer_1(protows, WSAPI_VERSION_1) != 0) { + afb_proto_ws_unref(protows); + protows = NULL; + } + } + return protows; } struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure) diff --git a/src/afb-proto-ws.h b/src/afb-proto-ws.h index 506960ea..2dcb142b 100644 --- a/src/afb-proto-ws.h +++ b/src/afb-proto-ws.h @@ -20,9 +20,13 @@ /* * Defined since version 3, the value AFB_PROTO_WS_VERSION can be used to - * track versions of afb-proto-ws. + * track versions of afb-proto-ws. History: + * + * date version comment + * 2018/04/09 3 introduced for bindings v3 + * 2019/11/20 4 introduced for tokens */ -#define AFB_PROTO_WS_VERSION 3 +#define AFB_PROTO_WS_VERSION 4 struct fdev; struct afb_proto_ws; @@ -37,17 +41,21 @@ struct afb_proto_ws_client_itf void (*on_reply)(void *closure, void *request, struct json_object *obj, const char *error, const char *info); /* can be NULL */ - void (*on_event_create)(void *closure, const char *event_name, int event_id); - void (*on_event_remove)(void *closure, const char *event_name, int event_id); - void (*on_event_subscribe)(void *closure, void *request, const char *event_name, int event_id); - void (*on_event_unsubscribe)(void *closure, void *request, const char *event_name, int event_id); - void (*on_event_push)(void *closure, const char *event_name, int event_id, struct json_object *data); + void (*on_event_create)(void *closure, uint16_t event_id, const char *event_name); + void (*on_event_remove)(void *closure, uint16_t event_id); + void (*on_event_subscribe)(void *closure, void *request, uint16_t event_id); + void (*on_event_unsubscribe)(void *closure, void *request, uint16_t event_id); + void (*on_event_push)(void *closure, uint16_t event_id, struct json_object *data); void (*on_event_broadcast)(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop); }; struct afb_proto_ws_server_itf { - void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid, const char *user_creds); + void (*on_session_create)(void *closure, uint16_t sessionid, const char *sessionstr); + void (*on_session_remove)(void *closure, uint16_t sessionid); + void (*on_token_create)(void *closure, uint16_t tokenid, const char *tokenstr); + void (*on_token_remove)(void *closure, uint16_t tokenid); + void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds); void (*on_describe)(void *closure, struct afb_proto_ws_describe *describe); }; @@ -66,12 +74,16 @@ extern void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangu extern void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws *, void (*)(int,void*), void*)); -extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, const char *sessionid, void *request, const char *user_creds); +extern int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr); +extern int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid); +extern int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr); +extern int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid); +extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, void *request, const char *user_creds); extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure); -extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id); -extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id); -extern int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data); +extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name); +extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id); +extern int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data); extern int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop); extern void afb_proto_ws_call_addref(struct afb_proto_ws_call *call); @@ -79,7 +91,7 @@ extern void afb_proto_ws_call_unref(struct afb_proto_ws_call *call); extern int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info); -extern int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id); -extern int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id); +extern int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id); +extern int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id); extern int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description); diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 9bd8ed63..d538fc65 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -43,13 +43,14 @@ #include "afb-context.h" #include "afb-evt.h" #include "afb-xreq.h" +#include "afb-token.h" #include "verbose.h" #include "fdev.h" #include "jobs.h" +#include "u16id.h" struct afb_stub_ws; - /** * structure for a ws request: requests on server side */ @@ -60,17 +61,6 @@ struct server_req { }; /** - * structure for recording events on client side - */ -struct client_event -{ - struct client_event *next; /**< link to the next */ - struct afb_event_x2 *event; /**< the local event */ - int id; /**< the identifier */ - int refcount; /**< a reference count */ -}; - -/** * structure for jobs of describing */ struct server_describe @@ -79,15 +69,6 @@ struct server_describe struct afb_proto_ws_describe *describe; }; -/** - * structure for recording sessions - */ -struct server_session -{ - struct server_session *next; - struct afb_session *session; -}; - /******************* stub description for client or servers ******************/ struct afb_stub_ws @@ -112,12 +93,27 @@ struct afb_stub_ws /* credentials of the client */ struct afb_cred *cred; + + /* event from server */ + struct u16id2bool *event_flags; + + /* transmitted sessions */ + struct u16id2ptr *session_proxies; + + /* transmitted tokens */ + struct u16id2ptr *token_proxies; }; /* client side */ struct { - /* event replica */ - struct client_event *events; + /* event from server */ + struct u16id2ptr *event_proxies; + + /* transmitted sessions */ + struct u16id2bool *session_flags; + + /* transmitted tokens */ + struct u16id2bool *token_flags; /* robustify */ struct { @@ -173,7 +169,7 @@ static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *e rc = afb_evt_event_x2_add_watch(wreq->stubws->listener, event); if (rc >= 0) - rc = afb_proto_ws_call_subscribe(wreq->call, afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event)); + rc = afb_proto_ws_call_subscribe(wreq->call, afb_evt_event_x2_id(event)); if (rc < 0) ERROR("error while subscribing event"); return rc; @@ -184,7 +180,7 @@ static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 int rc, rc2; struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq); - rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event)); + rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event)); rc2 = afb_evt_event_x2_remove_watch(wreq->stubws->listener, event); if (rc >= 0 && rc2 < 0) rc = rc2; @@ -202,33 +198,6 @@ static const struct afb_xreq_query_itf server_req_xreq_itf = { /******************* client part **********************************/ -/* destroy all events */ -static void client_drop_all_events(struct afb_stub_ws *stubws) -{ - struct client_event *ev, *nxt; - - nxt = __atomic_exchange_n(&stubws->events, NULL, __ATOMIC_RELAXED); - while (nxt) { - ev = nxt; - nxt = ev->next; - afb_evt_event_x2_unref(ev->event); - free(ev); - } -} - -/* search the event */ -static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name) -{ - struct client_event *ev; - - ev = stubws->events; - while (ev != NULL && (ev->id != eventid || 0 != strcmp(afb_evt_event_x2_fullname(ev->event), name))) - ev = ev->next; - - DEBUG("searching event %s[%d]: %s", name, eventid, ev ? "found" : "not found"); - return ev; -} - static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws) { struct fdev *fdev; @@ -243,12 +212,53 @@ static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws) return proto; } +static int client_make_ids(struct afb_stub_ws *stubws, struct afb_proto_ws *proto, struct afb_context *context, uint16_t *sessionid, uint16_t *tokenid) +{ + int rc, rc2; + uint16_t sid, tid; + + rc = 0; + + /* get the session */ + if (!context->session) + sid = 0; + else { + sid = afb_session_id(context->session); + rc2 = u16id2bool_set(&stubws->session_flags, sid, 1); + if (rc2 < 0) + rc = rc2; + else if (rc2 == 0) + rc = afb_proto_ws_client_session_create(proto, sid, afb_session_uuid(context->session)); + } + + /* get the token */ + if (!context->token) + tid = 0; + else { + tid = afb_token_id(context->token); + rc2 = u16id2bool_set(&stubws->token_flags, tid, 1); + if (rc2 < 0) + rc = rc2; + else if (rc2 == 0) { + rc2 = afb_proto_ws_client_token_create(proto, tid, afb_token_string(context->token)); + if (rc2 < 0) + rc = rc2; + } + } + + *sessionid = sid; + *tokenid = tid; + return rc; +} + /* on call, propagate it to the ws service */ static void client_api_call_cb(void * closure, struct afb_xreq *xreq) { int rc; struct afb_stub_ws *stubws = closure; struct afb_proto_ws *proto; + uint16_t sessionid; + uint16_t tokenid; proto = client_get_proto(stubws); if (proto == NULL) { @@ -256,14 +266,18 @@ static void client_api_call_cb(void * closure, struct afb_xreq *xreq) return; } - afb_xreq_unhooked_addref(xreq); - rc = afb_proto_ws_client_call( - proto, - xreq->request.called_verb, - afb_xreq_json(xreq), - afb_session_uuid(xreq->context.session), - xreq, - xreq_on_behalf_cred_export(xreq)); + rc = client_make_ids(stubws, proto, &xreq->context, &sessionid, &tokenid); + if (rc >= 0) { + afb_xreq_unhooked_addref(xreq); + rc = afb_proto_ws_client_call( + proto, + xreq->request.called_verb, + afb_xreq_json(xreq), + sessionid, + tokenid, + xreq, + xreq_on_behalf_cred_export(xreq)); + } if (rc < 0) { afb_xreq_reply(xreq, NULL, "internal", "can't send message"); afb_xreq_unhooked_unref(xreq); @@ -287,26 +301,35 @@ static void client_api_describe_cb(void * closure, void (*describecb)(void *, st static void server_event_add_cb(void *closure, const char *event, uint16_t eventid) { + int rc; struct afb_stub_ws *stubws = closure; - if (stubws->proto != NULL) - afb_proto_ws_server_event_create(stubws->proto, event, eventid); + if (stubws->proto != NULL) { + rc = u16id2bool_set(&stubws->event_flags, eventid, 1); + if (rc == 0) { + rc = afb_proto_ws_server_event_create(stubws->proto, eventid, event); + if (rc < 0) + u16id2bool_set(&stubws->event_flags, eventid, 0); + } + } } static void server_event_remove_cb(void *closure, const char *event, uint16_t eventid) { struct afb_stub_ws *stubws = closure; - if (stubws->proto != NULL) - afb_proto_ws_server_event_remove(stubws->proto, event, eventid); + if (stubws->proto != NULL) { + if (u16id2bool_set(&stubws->event_flags, eventid, 0)) + afb_proto_ws_server_event_remove(stubws->proto, eventid); + } } static void server_event_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object) { struct afb_stub_ws *stubws = closure; - if (stubws->proto != NULL) - afb_proto_ws_server_event_push(stubws->proto, event, eventid, object); + if (stubws->proto != NULL && u16id2bool_get(stubws->event_flags, eventid)) + afb_proto_ws_server_event_push(stubws->proto, eventid, object); json_object_put(object); } @@ -329,99 +352,69 @@ static void client_on_reply_cb(void *closure, void *request, struct json_object afb_xreq_unhooked_unref(xreq); } -static void client_on_event_create_cb(void *closure, const char *event_name, int event_id) +static void client_on_event_create_cb(void *closure, uint16_t event_id, const char *event_name) { struct afb_stub_ws *stubws = closure; - struct client_event *ev; - + struct afb_event_x2 *event; + int rc; + /* check conflicts */ - ev = client_event_search(stubws, event_id, event_name); - if (ev != NULL) { - __atomic_add_fetch(&ev->refcount, 1, __ATOMIC_RELAXED); - return; - } - - /* no conflict, try to add it */ - ev = malloc(sizeof *ev); - if (ev != NULL) { - ev->event = afb_evt_event_x2_create(event_name); - if (ev->event != NULL) { - ev->refcount = 1; - ev->id = event_id; - ev->next = stubws->events; - stubws->events = ev; - return; + event = afb_evt_event_x2_create(event_name); + if (event == NULL) + ERROR("can't create event %s, out of memory", event_name); + else { + rc = u16id2ptr_add(&stubws->event_proxies, event_id, event); + if (rc < 0) { + ERROR("can't record event %s", event_name); + afb_evt_event_x2_unref(event); } - free(ev); } - ERROR("can't create event %s, out of memory", event_name); } -static void client_on_event_remove_cb(void *closure, const char *event_name, int event_id) +static void client_on_event_remove_cb(void *closure, uint16_t event_id) { struct afb_stub_ws *stubws = closure; - struct client_event *ev, **prv; - - /* check conflicts */ - ev = client_event_search(stubws, event_id, event_name); - if (ev == NULL) - return; - - /* decrease the reference count */ - - if (__atomic_sub_fetch(&ev->refcount, 1, __ATOMIC_RELAXED)) - return; - - /* unlinks the event */ - prv = &stubws->events; - while (*prv != ev) - prv = &(*prv)->next; - *prv = ev->next; + struct afb_event_x2 *event; + int rc; - /* destroys the event */ - afb_evt_event_x2_unref(ev->event); - free(ev); + rc = u16id2ptr_drop(&stubws->event_proxies, event_id, (void**)&event); + if (rc == 0 && event) + afb_evt_event_x2_unref(event); } -static void client_on_event_subscribe_cb(void *closure, void *request, const char *event_name, int event_id) +static void client_on_event_subscribe_cb(void *closure, void *request, uint16_t event_id) { struct afb_stub_ws *stubws = closure; struct afb_xreq *xreq = request; - struct client_event *ev; - - /* check conflicts */ - ev = client_event_search(stubws, event_id, event_name); - if (ev == NULL) - return; + struct afb_event_x2 *event; + int rc; - if (afb_xreq_subscribe(xreq, ev->event) < 0) + rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event); + if (rc < 0 || !event || afb_xreq_subscribe(xreq, event) < 0) ERROR("can't subscribe: %m"); } -static void client_on_event_unsubscribe_cb(void *closure, void *request, const char *event_name, int event_id) +static void client_on_event_unsubscribe_cb(void *closure, void *request, uint16_t event_id) { struct afb_stub_ws *stubws = closure; struct afb_xreq *xreq = request; - struct client_event *ev; - - /* check conflicts */ - ev = client_event_search(stubws, event_id, event_name); - if (ev == NULL) - return; + struct afb_event_x2 *event; + int rc; - if (afb_xreq_unsubscribe(xreq, ev->event) < 0) + rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event); + if (rc < 0 || !event || afb_xreq_unsubscribe(xreq, event) < 0) ERROR("can't unsubscribe: %m"); } -static void client_on_event_push_cb(void *closure, const char *event_name, int event_id, struct json_object *data) +static void client_on_event_push_cb(void *closure, uint16_t event_id, struct json_object *data) { struct afb_stub_ws *stubws = closure; - struct client_event *ev; + struct afb_event_x2 *event; + int rc; - /* check conflicts */ - ev = client_event_search(stubws, event_id, event_name); - if (ev) - afb_evt_event_x2_push(ev->event, data); + rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event); + if (rc >= 0 && event) + afb_evt_event_x2_push(event, data); else ERROR("unreadable push event"); } @@ -433,55 +426,95 @@ static void client_on_event_broadcast_cb(void *closure, const char *event_name, /*****************************************************/ -static void server_record_session(struct afb_stub_ws *stubws, struct afb_session *session) +static struct afb_session *server_add_session(struct afb_stub_ws *stubws, uint16_t sessionid, const char *sessionstr) { - struct server_session *s, **prv; + struct afb_session *session; + int rc, created; - /* search */ - prv = &stubws->sessions; - while ((s = *prv)) { - if (s->session == session) - return; - if (afb_session_is_closed(s->session)) { - *prv = s->next; - afb_session_unref(s->session); - free(s); + session = afb_session_get(sessionstr, AFB_SESSION_TIMEOUT_DEFAULT, &created); + if (session == NULL) + ERROR("can't create session %s, out of memory", sessionstr); + else { + afb_session_set_autoclose(session, 1); + rc = u16id2ptr_add(&stubws->session_proxies, sessionid, session); + if (rc < 0) { + ERROR("can't record session %s", sessionstr); + afb_session_unref(session); + session = NULL; } - else - prv = &s->next; } + return session; +} - /* create */ - s = malloc(sizeof *s); - if (s) { - s->session = afb_session_addref(session); - s->next = stubws->sessions; - stubws->sessions = s; - } +static void server_on_session_create_cb(void *closure, uint16_t sessionid, const char *sessionstr) +{ + struct afb_stub_ws *stubws = closure; + + server_add_session(stubws, sessionid, sessionstr); } -static void server_release_all_sessions(struct afb_stub_ws *stubws) +static void server_on_session_remove_cb(void *closure, uint16_t sessionid) { - struct server_session *ses, *nses; + struct afb_stub_ws *stubws = closure; + struct afb_session *session; + int rc; + + rc = u16id2ptr_drop(&stubws->event_proxies, sessionid, (void**)&session); + if (rc == 0 && session) + afb_session_unref(session); +} - nses = __atomic_exchange_n(&stubws->sessions, NULL, __ATOMIC_RELAXED); - while(nses) { - ses = nses; - nses = ses->next; - afb_session_unref(ses->session); - free(ses); +static void server_on_token_create_cb(void *closure, uint16_t tokenid, const char *tokenstr) +{ + struct afb_stub_ws *stubws = closure; + struct afb_token *token; + int rc; + + rc = afb_token_get(&token, tokenstr); + if (rc < 0) + ERROR("can't create token %s, out of memory", tokenstr); + else { + rc = u16id2ptr_add(&stubws->token_proxies, tokenid, token); + if (rc < 0) { + ERROR("can't record token %s", tokenstr); + afb_token_unref(token); + } } } -/*****************************************************/ +static void server_on_token_remove_cb(void *closure, uint16_t tokenid) +{ + struct afb_stub_ws *stubws = closure; + struct afb_token *token; + int rc; + + rc = u16id2ptr_drop(&stubws->event_proxies, tokenid, (void**)&token); + if (rc == 0 && token) + afb_token_unref(token); +} -static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid, const char *user_creds) +static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds) { struct afb_stub_ws *stubws = closure; struct server_req *wreq; + struct afb_session *session; + struct afb_token *token; + int rc; afb_stub_ws_addref(stubws); + /* get tokens and sessions */ + rc = u16id2ptr_get(stubws->session_proxies, sessionid, (void**)&session); + if (rc < 0) { + if (sessionid != 0) + goto no_session; + session = server_add_session(stubws, sessionid, NULL); + if (!session) + goto out_of_memory; + } + if (!tokenid || u16id2ptr_get(stubws->token_proxies, tokenid, (void**)&token) < 0) + token = NULL; + /* create the request */ wreq = malloc(sizeof *wreq); if (wreq == NULL) @@ -492,11 +525,7 @@ static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, con wreq->call = call; /* init the context */ - if (afb_context_connect_validated(&wreq->xreq.context, sessionid) < 0) - goto unconnected; - server_record_session(stubws, wreq->xreq.context.session); - if (wreq->xreq.context.created) - afb_session_set_autoclose(wreq->xreq.context.session, 1); + afb_context_init(&wreq->xreq.context, session, token); /* makes the call */ wreq->xreq.cred = afb_cred_mixed_on_behalf_import(stubws->cred, &wreq->xreq.context, user_creds); @@ -506,9 +535,8 @@ static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, con afb_xreq_process(&wreq->xreq, stubws->apiset); return; -unconnected: - free(wreq); out_of_memory: +no_session: json_object_put(args); afb_stub_ws_unref(stubws); afb_proto_ws_call_reply(call, NULL, "internal-error", NULL); @@ -550,6 +578,10 @@ static struct afb_api_itf client_api_itf = { static const struct afb_proto_ws_server_itf server_itf = { + .on_session_create = server_on_session_create_cb, + .on_session_remove = server_on_session_remove_cb, + .on_token_create = server_on_token_create_cb, + .on_token_remove = server_on_token_remove_cb, .on_call = server_on_call_cb, .on_describe = server_on_describe_cb }; @@ -563,21 +595,61 @@ static const struct afb_evt_itf server_event_itf = { }; /*****************************************************/ +/*****************************************************/ + +static void release_all_sessions_cb(void*closure, uint16_t id, void *ptr) +{ + struct afb_session *session = ptr; + afb_session_unref(session); +} + +static void release_all_tokens_cb(void*closure, uint16_t id, void *ptr) +{ + struct afb_token *token = ptr; + afb_token_unref(token); +} + +static void release_all_events_cb(void*closure, uint16_t id, void *ptr) +{ + struct afb_event_x2 *eventid = ptr; + afb_evt_event_x2_unref(eventid); +} /* disconnect */ static void disconnect(struct afb_stub_ws *stubws) { + struct u16id2ptr *i2p; + struct u16id2bool *i2b; + afb_proto_ws_unref(__atomic_exchange_n(&stubws->proto, NULL, __ATOMIC_RELAXED)); - if (stubws->is_client) - client_drop_all_events(stubws); - else { + if (stubws->is_client) { + i2p = __atomic_exchange_n(&stubws->event_proxies, NULL, __ATOMIC_RELAXED); + if (i2p) { + u16id2ptr_forall(i2p, release_all_events_cb, NULL); + u16id2ptr_destroy(&i2p); + } + i2b = __atomic_exchange_n(&stubws->session_flags, NULL, __ATOMIC_RELAXED); + u16id2bool_destroy(&i2b); + i2b = __atomic_exchange_n(&stubws->token_flags, NULL, __ATOMIC_RELAXED); + u16id2bool_destroy(&i2b); + } else { afb_evt_listener_unref(__atomic_exchange_n(&stubws->listener, NULL, __ATOMIC_RELAXED)); afb_cred_unref(__atomic_exchange_n(&stubws->cred, NULL, __ATOMIC_RELAXED)); - server_release_all_sessions(stubws); + i2b = __atomic_exchange_n(&stubws->event_flags, NULL, __ATOMIC_RELAXED); + u16id2bool_destroy(&i2b); + i2p = __atomic_exchange_n(&stubws->session_proxies, NULL, __ATOMIC_RELAXED); + if (i2p) { + u16id2ptr_forall(i2p, release_all_sessions_cb, NULL); + u16id2ptr_destroy(&i2p); + } + i2p = __atomic_exchange_n(&stubws->token_proxies, NULL, __ATOMIC_RELAXED); + if (i2p) { + u16id2ptr_forall(i2p, release_all_tokens_cb, NULL); + u16id2ptr_destroy(&i2p); + } } } - /* callback when receiving a hangup */ static void on_hangup(void *closure) { diff --git a/src/main-afb-client-demo.c b/src/main-afb-client-demo.c index f2c0826e..b5877485 100644 --- a/src/main-afb-client-demo.c +++ b/src/main-afb-client-demo.c @@ -55,11 +55,11 @@ static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg static void on_pws_hangup(void *closure); static void on_pws_reply(void *closure, void *request, struct json_object *result, const char *error, const char *info); -static void on_pws_event_create(void *closure, const char *event_name, int event_id); -static void on_pws_event_remove(void *closure, const char *event_name, int event_id); -static void on_pws_event_subscribe(void *closure, void *request, const char *event_name, int event_id); -static void on_pws_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id); -static void on_pws_event_push(void *closure, const char *event_name, int event_id, struct json_object *data); +static void on_pws_event_create(void *closure, uint16_t event_id, const char *event_name); +static void on_pws_event_remove(void *closure, uint16_t event_id); +static void on_pws_event_subscribe(void *closure, void *request, uint16_t event_id); +static void on_pws_event_unsubscribe(void *closure, void *request, uint16_t event_id); +static void on_pws_event_push(void *closure, uint16_t event_id, struct json_object *data); static void on_pws_event_broadcast(void *closure, const char *event_name, struct json_object *data, const afb_proto_ws_uuid_t uuid, uint8_t hop); static void idle(); @@ -497,36 +497,36 @@ static void on_pws_reply(void *closure, void *request, struct json_object *resul dec_callcount(); } -static void on_pws_event_create(void *closure, const char *event_name, int event_id) +static void on_pws_event_create(void *closure, uint16_t event_id, const char *event_name) { printf("ON-EVENT-CREATE: [%d:%s]\n", event_id, event_name); fflush(stdout); } -static void on_pws_event_remove(void *closure, const char *event_name, int event_id) +static void on_pws_event_remove(void *closure, uint16_t event_id) { - printf("ON-EVENT-REMOVE: [%d:%s]\n", event_id, event_name); + printf("ON-EVENT-REMOVE: [%d]\n", event_id); fflush(stdout); } -static void on_pws_event_subscribe(void *closure, void *request, const char *event_name, int event_id) +static void on_pws_event_subscribe(void *closure, void *request, uint16_t event_id) { - printf("ON-EVENT-SUBSCRIBE %s: [%d:%s]\n", (char*)request, event_id, event_name); + printf("ON-EVENT-SUBSCRIBE %s: [%d]\n", (char*)request, event_id); fflush(stdout); } -static void on_pws_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id) +static void on_pws_event_unsubscribe(void *closure, void *request, uint16_t event_id) { - printf("ON-EVENT-UNSUBSCRIBE %s: [%d:%s]\n", (char*)request, event_id, event_name); + printf("ON-EVENT-UNSUBSCRIBE %s: [%d]\n", (char*)request, event_id); fflush(stdout); } -static void on_pws_event_push(void *closure, const char *event_name, int event_id, struct json_object *data) +static void on_pws_event_push(void *closure, uint16_t event_id, struct json_object *data) { if (raw) - printf("ON-EVENT-PUSH: [%d:%s]\n%s\n", event_id, event_name, json_object_to_json_string_ext(data, JSON_C_TO_STRING_NOSLASHESCAPE)); + printf("ON-EVENT-PUSH: [%d]\n%s\n", event_id, json_object_to_json_string_ext(data, JSON_C_TO_STRING_NOSLASHESCAPE)); if (human) - printf("ON-EVENT-PUSH: [%d:%s]\n%s\n", event_id, event_name, json_object_to_json_string_ext(data, JSON_C_TO_STRING_PRETTY|JSON_C_TO_STRING_NOSLASHESCAPE)); + printf("ON-EVENT-PUSH: [%d]\n%s\n", event_id, json_object_to_json_string_ext(data, JSON_C_TO_STRING_PRETTY|JSON_C_TO_STRING_NOSLASHESCAPE)); fflush(stdout); } @@ -564,7 +564,7 @@ static void pws_call(const char *verb, const char *object) if (jerr != json_tokener_success) o = json_object_new_string(object); } - rc = afb_proto_ws_client_call(pws, verb, o, sessionid, key, NULL); + rc = afb_proto_ws_client_call(pws, verb, o, 0, 0, key, NULL); json_object_put(o); if (rc < 0) { fprintf(stderr, "calling %s(%s) failed: %m\n", verb, object?:""); |