aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/afb-proto-ws.c111
1 files changed, 82 insertions, 29 deletions
diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c
index 09039763..90c3b05a 100644
--- a/src/afb-proto-ws.c
+++ b/src/afb-proto-ws.c
@@ -343,12 +343,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_string(&wb, info ?: "")
&& writebuf_object(&wb, obj)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -362,12 +365,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_string(&wb, status)
&& writebuf_string(&wb, info ? : "")) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -391,7 +397,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
sc->callback = callback;
sc->closure = cb_closure;
- pthread_mutex_unlock(&protows->mutex);
+ pthread_mutex_lock(&protows->mutex);
sc->subcallid = ptr2id(sc);
do {
sc->subcallid++;
@@ -409,7 +415,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
&& writebuf_string(&wb, api)
&& writebuf_string(&wb, verb)
&& writebuf_object(&wb, args)) {
+ pthread_mutex_lock(&protows->mutex);
rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -424,12 +432,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_uint32(&wb, (uint32_t)event_id)
&& writebuf_string(&wb, event_name)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -443,12 +454,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
{
int rc = -1;
struct writebuf wb = { .count = 0 };
+ struct afb_proto_ws *protows = call->protows;
if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
&& writebuf_uint32(&wb, call->callid)
&& writebuf_uint32(&wb, (uint32_t)event_id)
&& writebuf_string(&wb, event_name)) {
- rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto success;
@@ -461,7 +475,7 @@ success:
/******************* client part **********************************/
/* search a memorized call */
-static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
{
struct client_call *call;
@@ -472,11 +486,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint
return call;
}
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+{
+ struct client_call *result;
+
+ pthread_mutex_lock(&protows->mutex);
+ result = client_call_search_locked(protows, callid);
+ pthread_mutex_unlock(&protows->mutex);
+ return result;
+}
+
/* free and release the memorizing call */
static void client_call_destroy(struct client_call *call)
{
struct client_call **prv;
+ struct afb_proto_ws *protows = call->protows;
+ pthread_mutex_lock(&protows->mutex);
prv = &call->protows->calls;
while (*prv != NULL) {
if (*prv == call) {
@@ -485,6 +511,7 @@ static void client_call_destroy(struct client_call *call)
}
prv = &(*prv)->next;
}
+ pthread_mutex_unlock(&protows->mutex);
free(call);
}
@@ -505,7 +532,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb,
}
/* get the call */
- *call = client_call_search(protows, callid);
+ *call = client_call_search_unlocked(protows, callid);
if (*call == NULL) {
return 0;
}
@@ -600,6 +627,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r
if (!client_msg_call_get(protows, rb, &call))
return;
+
if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
@@ -614,12 +642,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc
{
struct writebuf wb = { .count = 0 };
char ie = status < 0;
+ int rc;
- return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+ if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
&& writebuf_uint32(&wb, subcallid)
&& writebuf_char(&wb, ie)
- && writebuf_object(&wb, object)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ && writebuf_object(&wb, object)) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
/* callback for subcall reply */
@@ -682,11 +717,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
struct json_object *object;
if (readbuf_uint32(rb, &descid)) {
+ pthread_mutex_lock(&protows->mutex);
prv = &protows->describes;
while ((desc = *prv) && desc->descid != descid)
prv = &desc->next;
- if (desc) {
+ if (!desc)
+ pthread_mutex_unlock(&protows->mutex);
+ else {
*prv = desc->next;
+ pthread_mutex_unlock(&protows->mutex);
if (!readbuf_object(rb, &object))
object = NULL;
desc->callback(desc->closure, object);
@@ -707,7 +746,6 @@ static void client_on_binary(void *closure, char *data, size_t size)
rb.end = data + size;
protows = closure;
- pthread_mutex_lock(&protows->mutex);
switch (*rb.head++) {
case CHAR_FOR_ANSWER_SUCCESS: /* success */
client_on_reply_success(protows, &rb);
@@ -743,7 +781,6 @@ static void client_on_binary(void *closure, char *data, size_t size)
/* TODO: close the connection */
break;
}
- pthread_mutex_unlock(&protows->mutex);
}
free(rb.base);
}
@@ -771,11 +808,12 @@ int afb_proto_ws_client_call(
/* init call data */
pthread_mutex_lock(&protows->mutex);
call->callid = ptr2id(call);
- while(client_call_search(protows, call->callid) != NULL)
+ while(client_call_search_locked(protows, call->callid) != NULL)
call->callid++;
call->protows = protows;
call->next = protows->calls;
protows->calls = call;
+ pthread_mutex_unlock(&protows->mutex);
/* creates the call message */
if (!writebuf_char(&wb, CHAR_FOR_CALL)
@@ -788,7 +826,9 @@ int afb_proto_ws_client_call(
}
/* send */
+ pthread_mutex_lock(&protows->mutex);
rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
if (rc >= 0) {
rc = 0;
goto end;
@@ -797,7 +837,6 @@ int afb_proto_ws_client_call(
clean:
client_call_destroy(call);
end:
- pthread_mutex_unlock(&protows->mutex);
return rc;
}
@@ -830,15 +869,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
desc->protows = protows;
desc->next = protows->describes;
protows->describes = desc;
- pthread_mutex_unlock(&protows->mutex);
/* send */
if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
&& writebuf_uint32(&wb, desc->descid)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0)
+ && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
+ pthread_mutex_unlock(&protows->mutex);
return 0;
+ }
- pthread_mutex_lock(&protows->mutex);
d = protows->describes;
if (d == desc)
protows->describes = desc->next;
@@ -848,8 +887,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
if (d)
d->next = desc->next;
}
- free(desc);
pthread_mutex_unlock(&protows->mutex);
+ free(desc);
error:
/* TODO? callback(closure, NULL); */
return -1;
@@ -931,18 +970,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf
static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
{
+ int rc;
struct writebuf wb = { .count = 0 };
- return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
- && writebuf_uint32(&wb, descid)
- && writebuf_object(&wb, descobj)
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+ && writebuf_uint32(&wb, descid)
+ && writebuf_object(&wb, descobj)) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
{
int rc = server_send_description(describe->protows, describe->descid, description);
- afb_proto_ws_addref(describe->protows);
+ afb_proto_ws_unref(describe->protows);
free(describe);
return rc;
}
@@ -1005,12 +1051,19 @@ static void server_on_binary(void *closure, char *data, size_t size)
static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
{
struct writebuf wb = { .count = 0 };
+ int rc;
- return -!(writebuf_char(&wb, order)
- && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
- && writebuf_string(&wb, event_name)
- && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))
- && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+ if (writebuf_char(&wb, order)
+ && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+ && writebuf_string(&wb, event_name)
+ && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+ pthread_mutex_lock(&protows->mutex);
+ rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+ pthread_mutex_unlock(&protows->mutex);
+ if (rc >= 0)
+ return 0;
+ }
+ return -1;
}
int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)