summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2017-04-10 21:39:22 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2017-04-10 21:39:22 +0200
commitca9807f73646f536ac58c002d963a8bb8d245f5d (patch)
treebf673c996dd7cb9f31fdf70cefd0593a34caeb4a
parent80900470d00c56d2fa51fb7c3da429d09e1d9a78 (diff)
Make implementation multithread
This changes makes many improvement needed for multi-threading: - json object can't be shared across threads because get/set is not protected - event are now multithread compatible Change-Id: Id44b12c68e0fa67042b8ea44939af4edfa76270a Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/afb-evt.c124
-rw-r--r--src/afb-msg-json.c14
-rw-r--r--src/afb-ws.c2
-rw-r--r--src/sig-monitor.c2
4 files changed, 98 insertions, 44 deletions
diff --git a/src/afb-evt.c b/src/afb-evt.c
index 8ae31cc5..2abbc819 100644
--- a/src/afb-evt.c
+++ b/src/afb-evt.c
@@ -22,6 +22,7 @@
#include <string.h>
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <afb/afb-event-itf.h>
@@ -47,6 +48,9 @@ struct afb_evt_listener {
/* head of the list of events listened */
struct afb_evt_watch *watchs;
+ /* mutex of the listener */
+ pthread_mutex_t mutex;
+
/* count of reference to the listener */
int refcount;
};
@@ -65,6 +69,9 @@ struct afb_evt_event {
/* id of the event */
int id;
+ /* mutex of the event */
+ pthread_mutex_t mutex;
+
/* name of the event */
char name[1];
};
@@ -105,9 +112,11 @@ static struct afb_event_itf afb_evt_event_itf = {
};
/* head of the list of listeners */
+static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
/* handling id of events */
+static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_event *events = NULL;
static int event_id_counter = 0;
static int event_id_wrapped = 0;
@@ -133,6 +142,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while(listener) {
if (listener->itf->broadcast != NULL) {
@@ -141,6 +151,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
}
listener = listener->next;
}
+ pthread_mutex_unlock(&listeners_mutex);
json_object_put(object);
return result;
}
@@ -157,6 +168,7 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&evt->mutex);
watch = evt->watchs;
while(watch) {
listener = watch->listener;
@@ -166,6 +178,7 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
watch = watch->next_by_event;
result++;
}
+ pthread_mutex_unlock(&evt->mutex);
json_object_put(obj);
return result;
}
@@ -214,25 +227,35 @@ static void remove_watch(struct afb_evt_watch *watch)
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ int found;
struct afb_evt_event **prv;
+ struct afb_evt_listener *listener;
+
if (evt != NULL) {
- /* removes the event if valid! */
+ /* unlinks the event if valid! */
+ pthread_mutex_lock(&events_mutex);
prv = &events;
- while (*prv != NULL) {
- if (*prv != evt)
- prv = &(*prv)->next;
- else {
- /* valid, unlink */
- *prv = evt->next;
-
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
-
- /* free */
- free(evt);
- break;
+ while (*prv && !(found = (*prv == evt)))
+ prv = &(*prv)->next;
+ if (found)
+ *prv = evt->next;
+ pthread_mutex_unlock(&events_mutex);
+
+ /* destroys the event */
+ if (found) {
+ /* removes all watchers */
+ while(evt->watchs != NULL) {
+ listener = evt->watchs->listener;
+ pthread_mutex_lock(&listener->mutex);
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(evt->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ pthread_mutex_unlock(&listener->mutex);
}
+
+ /* free */
+ pthread_mutex_destroy(&evt->mutex);
+ free(evt);
}
}
}
@@ -246,7 +269,18 @@ struct afb_event afb_evt_create_event(const char *name)
size_t len;
struct afb_evt_event *evt;
+ /* allocates the event */
+ len = strlen(name);
+ evt = malloc(len + sizeof * evt);
+ if (evt == NULL)
+ goto error;
+
+ /* initialize the event */
+ evt->watchs = NULL;
+ memcpy(evt->name, name, len + 1);
+
/* allocates the id */
+ pthread_mutex_lock(&events_mutex);
do {
if (++event_id_counter < 0) {
event_id_wrapped = 1;
@@ -259,19 +293,14 @@ struct afb_event afb_evt_create_event(const char *name)
evt = evt->next;
} while (evt != NULL);
- /* allocates the event */
- len = strlen(name);
- evt = malloc(len + sizeof * evt);
- if (evt == NULL)
- goto error;
-
/* initialize the event */
+ memcpy(evt->name, name, len + 1);
evt->next = events;
evt->watchs = NULL;
evt->id = event_id_counter;
- assert(evt->id > 0);
- memcpy(evt->name, name, len + 1);
+ pthread_mutex_init(&evt->mutex, NULL);
events = evt;
+ pthread_mutex_unlock(&events_mutex);
/* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
@@ -305,10 +334,13 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf,
struct afb_evt_listener *listener;
/* search if an instance already exists */
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while (listener != NULL) {
- if (listener->itf == itf && listener->closure == closure)
- return afb_evt_listener_addref(listener);
+ if (listener->itf == itf && listener->closure == closure) {
+ listener = afb_evt_listener_addref(listener);
+ goto found;
+ }
listener = listener->next;
}
@@ -316,13 +348,16 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf,
listener = calloc(1, sizeof *listener);
if (listener != NULL) {
/* init */
- listener->next = listeners;
listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
+ pthread_mutex_init(&listener->mutex, NULL);
+ listener->next = listeners;
listeners = listener;
}
+ found:
+ pthread_mutex_unlock(&listeners_mutex);
return listener;
}
@@ -331,7 +366,7 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf,
*/
struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
{
- listener->refcount++;
+ __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
return listener;
}
@@ -341,20 +376,31 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- if (0 == --listener->refcount) {
- struct afb_evt_listener **prv;
+ struct afb_evt_listener **prv;
+ struct afb_evt_event *evt;
- /* remove the watchers */
- while (listener->watchs != NULL)
- remove_watch(listener->watchs);
+ if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
+ pthread_mutex_lock(&listeners_mutex);
prv = &listeners;
while (*prv != listener)
prv = &(*prv)->next;
*prv = listener->next;
+ pthread_mutex_unlock(&listeners_mutex);
+
+ /* remove the watchers */
+ pthread_mutex_lock(&listener->mutex);
+ while (listener->watchs != NULL) {
+ evt = listener->watchs->event;
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(listener->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ }
+ pthread_mutex_unlock(&listener->mutex);
/* free the listener */
+ pthread_mutex_destroy(&listener->mutex);
free(listener);
}
}
@@ -376,6 +422,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
/* search the existing watch for the listener */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt)
@@ -386,23 +433,27 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
/* not found, allocate a new */
watch = malloc(sizeof *watch);
if (watch == NULL) {
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOMEM;
return -1;
}
/* initialise and link */
watch->event = evt;
- watch->next_by_event = evt->watchs;
+ watch->activity = 0;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
- watch->activity = 0;
- evt->watchs = watch;
listener->watchs = watch;
+ pthread_mutex_lock(&evt->mutex);
+ watch->next_by_event = evt->watchs;
+ evt->watchs = watch;
+ pthread_mutex_unlock(&evt->mutex);
found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evt->name, evt->id);
watch->activity++;
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
@@ -424,6 +475,7 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve
/* search the existing watch */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt) {
@@ -433,10 +485,12 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve
if (watch->activity == 0 && listener->itf->remove != NULL)
listener->itf->remove(listener->closure, evt->name, evt->id);
}
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
watch = watch->next_by_listener;
}
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOENT;
return -1;
}
diff --git a/src/afb-msg-json.c b/src/afb-msg-json.c
index b4ae51b4..6d8f7327 100644
--- a/src/afb-msg-json.c
+++ b/src/afb-msg-json.c
@@ -29,15 +29,14 @@ struct json_object *afb_msg_json_reply(const char *status, const char *info, str
{
json_object *msg, *request;
const char *token, *uuid;
- static json_object *type_reply = NULL;
+ json_object *type_reply = NULL;
msg = json_object_new_object();
if (resp != NULL)
json_object_object_add(msg, "response", resp);
- if (type_reply == NULL)
- type_reply = json_object_new_string("afb-reply");
- json_object_object_add(msg, "jtype", json_object_get(type_reply));
+ type_reply = json_object_new_string("afb-reply");
+ json_object_object_add(msg, "jtype", type_reply);
request = json_object_new_object();
json_object_object_add(msg, "request", request);
@@ -75,7 +74,7 @@ struct json_object *afb_msg_json_reply_error(const char *status, const char *inf
struct json_object *afb_msg_json_event(const char *event, struct json_object *object)
{
json_object *msg;
- static json_object *type_event = NULL;
+ json_object *type_event = NULL;
msg = json_object_new_object();
@@ -84,9 +83,8 @@ struct json_object *afb_msg_json_event(const char *event, struct json_object *ob
if (object != NULL)
json_object_object_add(msg, "data", object);
- if (type_event == NULL)
- type_event = json_object_new_string("afb-event");
- json_object_object_add(msg, "jtype", json_object_get(type_event));
+ type_event = json_object_new_string("afb-event");
+ json_object_object_add(msg, "jtype", type_event);
return msg;
}
diff --git a/src/afb-ws.c b/src/afb-ws.c
index cc852b20..c6126100 100644
--- a/src/afb-ws.c
+++ b/src/afb-ws.c
@@ -407,7 +407,7 @@ static int aws_read(struct afb_ws *ws, size_t size)
return 0;
pfd.fd = ws->fd;
pfd.events = POLLIN;
- poll(&pfd, 1, 10);
+ poll(&pfd, 1, 10); /* TODO: make fully asynchronous websockets */
} else {
ws->buffer.size += (size_t)sz;
size -= (size_t)sz;
diff --git a/src/sig-monitor.c b/src/sig-monitor.c
index d00f0f97..89fd4444 100644
--- a/src/sig-monitor.c
+++ b/src/sig-monitor.c
@@ -115,6 +115,8 @@ static void on_signal_error(int signum)
{
sigset_t sigset;
+ ERROR("ALERT! signal %d received: %s", signum, strsignal(signum));
+
// unlock signal to allow a new signal to come
if (error_handler != NULL) {
sigemptyset(&sigset);