From 760f61ba4523d5c3461a0c30bbda78c84ab80103 Mon Sep 17 00:00:00 2001 From: José Bollo Date: Sat, 11 Nov 2017 22:36:05 +0100 Subject: afb-proto-ws: fix self locking issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Calling synchronously a verb on an event of the same API was blocking. Change-Id: I58a988c6df8c60cd3a38c3cdff23d7be8b6be54e Signed-off-by: José Bollo --- src/afb-proto-ws.c | 111 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 82 insertions(+), 29 deletions(-) diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 09039763..90c3b05a 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -343,12 +343,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, info ?: "") && writebuf_object(&wb, obj)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -362,12 +365,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, status) && writebuf_string(&wb, info ? : "")) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -391,7 +397,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c sc->callback = callback; sc->closure = cb_closure; - pthread_mutex_unlock(&protows->mutex); + pthread_mutex_lock(&protows->mutex); sc->subcallid = ptr2id(sc); do { sc->subcallid++; @@ -409,7 +415,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c && writebuf_string(&wb, api) && writebuf_string(&wb, verb) && writebuf_object(&wb, args)) { + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -424,12 +432,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -443,12 +454,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; @@ -461,7 +475,7 @@ success: /******************* client part **********************************/ /* search a memorized call */ -static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid) { struct client_call *call; @@ -472,11 +486,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint return call; } +static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid) +{ + struct client_call *result; + + pthread_mutex_lock(&protows->mutex); + result = client_call_search_locked(protows, callid); + pthread_mutex_unlock(&protows->mutex); + return result; +} + /* free and release the memorizing call */ static void client_call_destroy(struct client_call *call) { struct client_call **prv; + struct afb_proto_ws *protows = call->protows; + pthread_mutex_lock(&protows->mutex); prv = &call->protows->calls; while (*prv != NULL) { if (*prv == call) { @@ -485,6 +511,7 @@ static void client_call_destroy(struct client_call *call) } prv = &(*prv)->next; } + pthread_mutex_unlock(&protows->mutex); free(call); } @@ -505,7 +532,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, } /* get the call */ - *call = client_call_search(protows, callid); + *call = client_call_search_unlocked(protows, callid); if (*call == NULL) { return 0; } @@ -600,6 +627,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r if (!client_msg_call_get(protows, rb, &call)) return; + if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) { protows->client_itf->on_reply_fail(protows->closure, call->request, status, info); @@ -614,12 +642,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc { struct writebuf wb = { .count = 0 }; char ie = status < 0; + int rc; - return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) + if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) && writebuf_uint32(&wb, subcallid) && writebuf_char(&wb, ie) - && writebuf_object(&wb, object) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + && writebuf_object(&wb, object)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } /* callback for subcall reply */ @@ -682,11 +717,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * struct json_object *object; if (readbuf_uint32(rb, &descid)) { + pthread_mutex_lock(&protows->mutex); prv = &protows->describes; while ((desc = *prv) && desc->descid != descid) prv = &desc->next; - if (desc) { + if (!desc) + pthread_mutex_unlock(&protows->mutex); + else { *prv = desc->next; + pthread_mutex_unlock(&protows->mutex); if (!readbuf_object(rb, &object)) object = NULL; desc->callback(desc->closure, object); @@ -707,7 +746,6 @@ static void client_on_binary(void *closure, char *data, size_t size) rb.end = data + size; protows = closure; - pthread_mutex_lock(&protows->mutex); switch (*rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ client_on_reply_success(protows, &rb); @@ -743,7 +781,6 @@ static void client_on_binary(void *closure, char *data, size_t size) /* TODO: close the connection */ break; } - pthread_mutex_unlock(&protows->mutex); } free(rb.base); } @@ -771,11 +808,12 @@ int afb_proto_ws_client_call( /* init call data */ pthread_mutex_lock(&protows->mutex); call->callid = ptr2id(call); - while(client_call_search(protows, call->callid) != NULL) + while(client_call_search_locked(protows, call->callid) != NULL) call->callid++; call->protows = protows; call->next = protows->calls; protows->calls = call; + pthread_mutex_unlock(&protows->mutex); /* creates the call message */ if (!writebuf_char(&wb, CHAR_FOR_CALL) @@ -788,7 +826,9 @@ int afb_proto_ws_client_call( } /* send */ + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto end; @@ -797,7 +837,6 @@ int afb_proto_ws_client_call( clean: client_call_destroy(call); end: - pthread_mutex_unlock(&protows->mutex); return rc; } @@ -830,15 +869,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( desc->protows = protows; desc->next = protows->describes; protows->describes = desc; - pthread_mutex_unlock(&protows->mutex); /* send */ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) && writebuf_uint32(&wb, desc->descid) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) + && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) { + pthread_mutex_unlock(&protows->mutex); return 0; + } - pthread_mutex_lock(&protows->mutex); d = protows->describes; if (d == desc) protows->describes = desc->next; @@ -848,8 +887,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( if (d) d->next = desc->next; } - free(desc); pthread_mutex_unlock(&protows->mutex); + free(desc); error: /* TODO? callback(closure, NULL); */ return -1; @@ -931,18 +970,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { + int rc; struct writebuf wb = { .count = 0 }; - return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION) - && writebuf_uint32(&wb, descid) - && writebuf_object(&wb, descobj) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION) + && writebuf_uint32(&wb, descid) + && writebuf_object(&wb, descobj)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description) { int rc = server_send_description(describe->protows, describe->descid, description); - afb_proto_ws_addref(describe->protows); + afb_proto_ws_unref(describe->protows); free(describe); return rc; } @@ -1005,12 +1051,19 @@ static void server_on_binary(void *closure, char *data, size_t size) static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) { struct writebuf wb = { .count = 0 }; + int rc; - return -!(writebuf_char(&wb, order) - && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) - && writebuf_string(&wb, event_name) - && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data)) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, order) + && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) + && writebuf_string(&wb, event_name) + && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) -- cgit 1.2.3-korg