aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2016-06-09 07:54:31 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2016-06-09 08:39:02 +0200
commit7e0abe76db7b90369429bf387d7aad0fb5a42328 (patch)
treeeedf5609a588f0d9d743fb941bc8727ff1356b21
parentc37c8e6291c36665dd23601a8ce1449afd43e6df (diff)
Events: refactoring
This new version allows to subscribe a client for an event. The event should first be created for the API (the API's prefix is added) using 'afb_daemon_make_event'. After that, plugins can subscribe or unsubscribe their clients (identified through requests) to the events that it generates. See 'afb_req_subscribe' and 'afb_req_unsubscribe'. Events created by 'afb_daemon_make_event' can be widely broadcasted using 'afb_event_broadcast' or pushed only to suscribers using 'afb_event_push'. Events can be destroyed using 'afb_event_drop'. Change-Id: I7c0bed5e625c2052dcd81c6bfe960def1fa032f3 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--include/afb/afb-event-itf.h84
-rw-r--r--include/afb/afb-plugin.h17
-rw-r--r--include/afb/afb-req-itf.h28
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/afb-api-dbus.c23
-rw-r--r--src/afb-api-dbus.h4
-rw-r--r--src/afb-api-so.c26
-rw-r--r--src/afb-context.c1
-rw-r--r--src/afb-evt.c261
-rw-r--r--src/afb-evt.h36
-rw-r--r--src/afb-hreq.c15
-rw-r--r--src/afb-hreq.h3
-rw-r--r--src/afb-ws-json1.c38
-rw-r--r--src/afb-ws-json1.h3
-rw-r--r--src/session.c108
-rw-r--r--src/session.h16
16 files changed, 500 insertions, 164 deletions
diff --git a/include/afb/afb-event-itf.h b/include/afb/afb-event-itf.h
new file mode 100644
index 00000000..47ffa387
--- /dev/null
+++ b/include/afb/afb-event-itf.h
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author: José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+/* avoid inclusion of <json-c/json.h> */
+struct json_object;
+
+/*
+ * Interface for handling requests.
+ * It records the functions to be called for the request.
+ * Don't use this structure directly.
+ * Use the helper functions documented below.
+ */
+struct afb_event_itf {
+ /* CAUTION: respect the order, add at the end */
+
+ int (*broadcast)(void *closure, struct json_object *obj);
+ int (*push)(void *closure, struct json_object *obj);
+ void (*drop)(void *closure);
+};
+
+/*
+ * Describes the request by plugins from afb-daemon
+ */
+struct afb_event {
+ const struct afb_event_itf *itf; /* the interface to use */
+ void *closure; /* the closure argument for functions of 'itf' */
+};
+
+/*
+ * Broadcasts widely the 'event' with the data 'object'.
+ * 'object' can be NULL.
+ *
+ * For conveniency, the function calls 'json_object_put' for 'object'.
+ * Thus, in the case where 'object' should remain available after
+ * the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
+ */
+static inline int afb_event_broadcast(struct afb_event event, struct json_object *object)
+{
+ return event.itf->broadcast(event.closure, object);
+}
+
+/*
+ * Pushes the 'event' with the data 'object' to its obeservers.
+ * 'object' can be NULL.
+ *
+ * For conveniency, the function calls 'json_object_put' for 'object'.
+ * Thus, in the case where 'object' should remain available after
+ * the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
+ */
+static inline int afb_event_push(struct afb_event event, struct json_object *object)
+{
+ return event.itf->push(event.closure, object);
+}
+
+/*
+ * Drops the data associated to the event
+ * After calling this function, the event
+ * MUST NOT BE USED ANYMORE.
+ */
+static inline void afb_event_drop(struct afb_event event)
+{
+ event.itf->drop(event.closure);
+}
+
diff --git a/include/afb/afb-plugin.h b/include/afb/afb-plugin.h
index 1eb3475b..85e8de7e 100644
--- a/include/afb/afb-plugin.h
+++ b/include/afb/afb-plugin.h
@@ -38,6 +38,7 @@
* Some function of the library are exported to afb-daemon.
*/
+#include <afb/afb-event-itf.h>
#include <afb/afb-req-itf.h>
/*
@@ -140,11 +141,12 @@ struct sd_bus;
* Definition of the facilities provided by the daemon.
*/
struct afb_daemon_itf {
- void (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */
+ int (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */
struct sd_event *(*get_event_loop)(void *closure); /* gets the common systemd's event loop */
struct sd_bus *(*get_user_bus)(void *closure); /* gets the common systemd's user d-bus */
struct sd_bus *(*get_system_bus)(void *closure); /* gets the common systemd's system d-bus */
void (*vverbose)(void*closure, int level, const char *file, int line, const char *fmt, va_list args);
+ struct afb_event (*event_make)(void *closure, const char *name); /* creates an event of 'name' */
};
/*
@@ -206,13 +208,24 @@ static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon)
* For conveniency, the function calls 'json_object_put' for 'object'.
* Thus, in the case where 'object' should remain available after
* the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
*/
-static inline void afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object)
+static inline int afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object)
{
return daemon.itf->event_broadcast(daemon.closure, name, object);
}
/*
+ * Creates an event of 'name' and returns it.
+ * 'daemon' MUST be the daemon given in interface when activating the plugin.
+ */
+static inline struct afb_event afb_daemon_make_event(struct afb_daemon daemon, const char *name)
+{
+ return daemon.itf->event_make(daemon.closure, name);
+}
+
+/*
* Send a message described by 'fmt' and following parameters
* to the journal for the verbosity 'level'.
* 'file' and 'line' are indicators of position of the code in source files.
diff --git a/include/afb/afb-req-itf.h b/include/afb/afb-req-itf.h
index f4fab551..2b3bc467 100644
--- a/include/afb/afb-req-itf.h
+++ b/include/afb/afb-req-itf.h
@@ -25,6 +25,8 @@
#include <stdarg.h>
#include <stdio.h>
+#include <afb/afb-event-itf.h>
+
/* avoid inclusion of <json-c/json.h> */
struct json_object;
@@ -65,6 +67,9 @@ struct afb_req_itf {
void (*session_close)(void *closure);
int (*session_set_LOA)(void *closure, unsigned level);
+
+ int (*subscribe)(void *closure, struct afb_event event);
+ int (*unsubscribe)(void *closure, struct afb_event event);
};
/*
@@ -315,6 +320,29 @@ static inline struct afb_req afb_req_unstore(struct afb_req *req)
return result;
}
+/*
+ * Establishes for the client link identified by 'req' a subscription
+ * to the 'event'.
+ * Returns 0 in case of successful subscription or -1 in case of error.
+ */
+static inline int afb_req_subscribe(struct afb_req req, struct afb_event event)
+{
+ return req.itf->subscribe(req.closure, event);
+}
+
+/*
+ * Revokes the subscription established to the 'event' for the client
+ * link identified by 'req'.
+ * Returns 0 in case of successful subscription or -1 in case of error.
+ */
+static inline int afb_req_unsubscribe(struct afb_req req, struct afb_event event)
+{
+ return req.itf->unsubscribe(req.closure, event);
+}
+
+
+
+
/* internal use */
static inline const char *afb_req_raw(struct afb_req req, size_t *size)
{
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ca20665b..cbaf286f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -35,6 +35,7 @@ ADD_LIBRARY(afb-lib STATIC
afb-apis.c
afb-common.c
afb-context.c
+ afb-evt.c
afb-hreq.c
afb-hsrv.c
afb-hswitch.c
diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c
index c0bc57a8..eda7985f 100644
--- a/src/afb-api-dbus.c
+++ b/src/afb-api-dbus.c
@@ -35,6 +35,7 @@
#include "afb-apis.h"
#include "afb-api-so.h"
#include "afb-context.h"
+#include "afb-evt.h"
#include "verbose.h"
static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
@@ -52,6 +53,7 @@ struct api_dbus
char *path; /* path of the object for the API */
char *name; /* name/interface of the object */
char *api; /* api name of the interface */
+ struct afb_evt_listener *listener;
};
#define RETOK 1
@@ -302,7 +304,7 @@ static int api_dbus_client_on_event(sd_bus_message *m, void *userdata, sd_bus_er
ERROR("unreadable event");
else {
object = json_tokener_parse(data);
- ctxClientEventSend(NULL, event, object);
+ afb_evt_broadcast(event, object);
json_object_put(object);
}
return 1;
@@ -444,7 +446,7 @@ static void dbus_req_send(struct dbus_req *dreq, const char *buffer, size_t size
dbus_req_reply(dreq, RETRAW, buffer, "");
}
-struct afb_req_itf dbus_req_itf = {
+const struct afb_req_itf afb_api_dbus_req_itf = {
.json = (void*)dbus_req_json,
.get = (void*)dbus_req_get,
.success = (void*)dbus_req_success,
@@ -504,7 +506,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
dreq->message = sd_bus_message_ref(message);
dreq->json = NULL;
dreq->refcount = 1;
- areq.itf = &dbus_req_itf;
+ areq.itf = &afb_api_dbus_req_itf;
areq.closure = dreq;
afb_apis_call_(areq, &dreq->context, api->api, method);
dbus_req_unref(dreq);
@@ -522,19 +524,6 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve
json_object_put(object);
}
-static int afb_api_dbus_server_expects_event(struct api_dbus *api, const char *event)
-{
- size_t len = strlen(api->api);
- if (strncasecmp(event, api->api, len) != 0)
- return 0;
- return event[len] == '.';
-}
-
-static struct afb_event_listener_itf evitf = {
- .send = (void*)afb_api_dbus_server_send_event,
- .expects = (void*)afb_api_dbus_server_expects_event
-};
-
/* create the service */
int afb_api_dbus_add_server(const char *path)
{
@@ -563,7 +552,7 @@ int afb_api_dbus_add_server(const char *path)
}
INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
- ctxClientEventListenerAdd(NULL, (struct afb_event_listener){ .itf = &evitf, .closure = api });
+ api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api);
return 0;
error3:
diff --git a/src/afb-api-dbus.h b/src/afb-api-dbus.h
index c8a7bc3f..10f5f7ff 100644
--- a/src/afb-api-dbus.h
+++ b/src/afb-api-dbus.h
@@ -18,6 +18,10 @@
#pragma once
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_api_dbus_req_itf;
+
extern int afb_api_dbus_add_client(const char *path);
extern int afb_api_dbus_add_server(const char *path);
diff --git a/src/afb-api-so.c b/src/afb-api-so.c
index b741b13a..84f69753 100644
--- a/src/afb-api-so.c
+++ b/src/afb-api-so.c
@@ -30,6 +30,7 @@
#include <afb/afb-plugin.h>
#include <afb/afb-req-itf.h>
+#include <afb/afb-event-itf.h>
#include "session.h"
#include "afb-common.h"
@@ -37,6 +38,7 @@
#include "afb-apis.h"
#include "afb-api-so.h"
#include "afb-sig-handler.h"
+#include "afb-evt.h"
#include "verbose.h"
/*
@@ -58,18 +60,37 @@ void afb_api_so_set_timeout(int to)
api_timeout = to;
}
+static struct afb_event afb_api_so_event_make(struct api_so_desc *desc, const char *name)
+{
+ size_t length;
+ char *event;
+
+ /* makes the event name */
+ assert(desc->plugin != NULL);
+ length = strlen(name);
+ event = alloca(length + 2 + desc->apilength);
+ memcpy(event, desc->plugin->v1.prefix, desc->apilength);
+ event[desc->apilength] = '/';
+ memcpy(event + desc->apilength + 1, name, length + 1);
+
+ /* crate the event */
+ return afb_evt_create_event(event);
+}
+
static int afb_api_so_event_broadcast(struct api_so_desc *desc, const char *name, struct json_object *object)
{
size_t length;
char *event;
+ /* makes the event name */
assert(desc->plugin != NULL);
length = strlen(name);
event = alloca(length + 2 + desc->apilength);
memcpy(event, desc->plugin->v1.prefix, desc->apilength);
event[desc->apilength] = '/';
memcpy(event + desc->apilength + 1, name, length + 1);
- return ctxClientEventSend(NULL, event, object);
+
+ return afb_evt_broadcast(event, object);
}
static void afb_api_so_vverbose(struct api_so_desc *desc, int level, const char *file, int line, const char *fmt, va_list args)
@@ -89,7 +110,8 @@ static const struct afb_daemon_itf daemon_itf = {
.get_event_loop = (void*)afb_common_get_event_loop,
.get_user_bus = (void*)afb_common_get_user_bus,
.get_system_bus = (void*)afb_common_get_system_bus,
- .vverbose = (void*)afb_api_so_vverbose
+ .vverbose = (void*)afb_api_so_vverbose,
+ .event_make = (void*)afb_api_so_event_make
};
struct monitoring {
diff --git a/src/afb-context.c b/src/afb-context.c
index ba093c37..5fe32764 100644
--- a/src/afb-context.c
+++ b/src/afb-context.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2015, 2016 "IoT.bzh"
* Author "Fulup Ar Foll"
+ * Author José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/afb-evt.c b/src/afb-evt.c
new file mode 100644
index 00000000..00261b6c
--- /dev/null
+++ b/src/afb-evt.c
@@ -0,0 +1,261 @@
+/*
+ * Copyright (C) 2015, 2016 "IoT.bzh"
+ * Author "Fulup Ar Foll"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include <json-c/json.h>
+#include <afb/afb-event-itf.h>
+
+#include "afb-evt.h"
+
+
+struct afb_evt_watch;
+
+struct afb_evt_listener {
+ struct afb_evt_listener *next;
+ void (*send)(void *closure, const char *event, struct json_object *object);
+ void *closure;
+ struct afb_evt_watch *watchs;
+ int refcount;
+};
+
+struct afb_evt_event {
+ struct afb_evt_watch *watchs;
+ char name[1];
+};
+
+struct afb_evt_watch {
+ struct afb_evt_event *event;
+ struct afb_evt_watch *next_by_event;
+ struct afb_evt_listener *listener;
+ struct afb_evt_watch *next_by_listener;
+};
+
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *obj);
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj);
+static void evt_drop(struct afb_evt_event *evt);
+
+static struct afb_event_itf afb_evt_event_itf = {
+ .broadcast = (void*)evt_broadcast,
+ .push = (void*)evt_push,
+ .drop = (void*)evt_drop
+};
+
+static struct afb_evt_listener *listeners = NULL;
+
+static inline int evt_trash(struct json_object *obj)
+{
+ return 0;
+}
+
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
+{
+ return afb_evt_broadcast(evt->name, object);
+}
+
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+ int result;
+ struct afb_evt_listener *listener;
+
+ result = 0;
+ listener = listeners;
+ while(listener) {
+ listener->send(listener->closure, event, json_object_get(object));
+ listener = listener->next;
+ }
+ json_object_put(object);
+ return result;
+}
+
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+{
+ int result;
+ struct afb_evt_watch *watch;
+ struct afb_evt_listener *listener;
+
+ result = 0;
+ watch = evt->watchs;
+ while(listener) {
+ listener = watch->listener;
+ listener->send(listener->closure, evt->name, json_object_get(obj));
+ watch = watch->next_by_event;
+ }
+ json_object_put(obj);
+ return result;
+}
+
+static void remove_watch(struct afb_evt_watch *watch)
+{
+ struct afb_evt_watch **prv;
+
+ prv = &watch->event->watchs;
+ while(*prv != watch)
+ prv = &(*prv)->next_by_event;
+ *prv = watch->next_by_event;
+
+ prv = &watch->listener->watchs;
+ while(*prv != watch)
+ prv = &(*prv)->next_by_listener;
+ *prv = watch->next_by_listener;
+
+ free(watch);
+}
+
+static void evt_drop(struct afb_evt_event *evt)
+{
+ if (evt != NULL) {
+ while(evt->watchs != NULL)
+ remove_watch(evt->watchs);
+ free(evt);
+ }
+}
+
+struct afb_event afb_evt_create_event(const char *name)
+{
+ size_t len;
+ struct afb_evt_event *evt;
+
+ len = strlen(name);
+ evt = malloc(len + sizeof * evt);
+ if (evt != NULL) {
+ evt->watchs = NULL;
+ memcpy(evt->name, name, len + 1);
+ }
+ return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+}
+
+struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+{
+ struct afb_evt_listener *listener;
+
+ /* search if an instance already exists */
+ listener = listeners;
+ while (listener != NULL) {
+ if (listener->send == send && listener->closure == closure)
+ return afb_evt_listener_addref(listener);
+ listener = listener->next;
+ }
+
+ /* allocates */
+ listener = calloc(1, sizeof *listener);
+ if (listener != NULL) {
+ /* init */
+ listener->next = listeners;
+ listener->send = send;
+ listener->closure = closure;
+ listener->watchs = NULL;
+ listener->refcount = 1;
+ listeners = listener;
+ }
+ return listener;
+}
+
+struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
+{
+ listener->refcount++;
+ return listener;
+}
+
+void afb_evt_listener_unref(struct afb_evt_listener *listener)
+{
+ if (0 == --listener->refcount) {
+ struct afb_evt_listener **prv;
+
+ /* remove the watchers */
+ while (listener->watchs != NULL)
+ remove_watch(listener->watchs);
+
+ /* unlink the listener */
+ prv = &listeners;
+ while (*prv != listener)
+ prv = &(*prv)->next;
+ *prv = listener->next;
+
+ /* free the listener */
+ free(listener);
+ }
+}
+
+int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
+{
+ struct afb_evt_watch *watch;
+ struct afb_evt_event *evt;
+
+ /* check parameter */
+ if (event.itf != &afb_evt_event_itf) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ /* search the existing watch */
+ watch = listener->watchs;
+ while(watch != NULL) {
+ if (watch->event == event.closure)
+ return 0;
+ watch = watch->next_by_listener;
+ }
+
+ /* not found, allocate a new */
+ watch = malloc(sizeof *watch);
+ if (watch == NULL) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ /* initialise and link */
+ evt = event.closure;
+ watch->event = evt;
+ watch->next_by_event = evt->watchs;
+ watch->listener = listener;
+ watch->next_by_listener = listener->watchs;
+ evt->watchs = watch;
+ listener->watchs = watch;
+
+ return 0;
+}
+
+int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
+{
+ struct afb_evt_watch *watch;
+
+ /* check parameter */
+ if (event.itf != &afb_evt_event_itf) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ /* search the existing watch */
+ watch = listener->watchs;
+ while(watch != NULL) {
+ if (watch->event == event.closure) {
+ /* found: remove it */
+ remove_watch(watch);
+ break;
+ }
+ watch = watch->next_by_listener;
+ }
+ return 0;
+}
+
+
diff --git a/src/afb-evt.h b/src/afb-evt.h
new file mode 100644
index 00000000..8e102546
--- /dev/null
+++ b/src/afb-evt.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author: José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+struct afb_event;
+struct AFB_clientCtx;
+
+struct afb_evt_listener;
+
+extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure);
+
+extern int afb_evt_broadcast(const char *event, struct json_object *object);
+
+extern struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener);
+extern void afb_evt_listener_unref(struct afb_evt_listener *listener);
+
+extern struct afb_event afb_evt_create_event(const char *name);
+
+extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event);
+extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event);
+
diff --git a/src/afb-hreq.c b/src/afb-hreq.c
index 4ca8441e..1e38b412 100644
--- a/src/afb-hreq.c
+++ b/src/afb-hreq.c
@@ -74,8 +74,9 @@ static void req_fail(struct afb_hreq *hreq, const char *status, const char *info
static void req_success(struct afb_hreq *hreq, json_object *obj, const char *info);
static const char *req_raw(struct afb_hreq *hreq, size_t *size);
static void req_send(struct afb_hreq *hreq, const char *buffer, size_t size);
+static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event);
-static const struct afb_req_itf afb_hreq_itf = {
+const struct afb_req_itf afb_hreq_req_itf = {
.json = (void*)req_json,
.get = (void*)req_get,
.success = (void*)req_success,
@@ -87,7 +88,9 @@ static const struct afb_req_itf afb_hreq_itf = {
.addref = (void*)afb_hreq_addref,
.unref = (void*)afb_hreq_unref,
.session_close = (void*)afb_context_close,
- .session_set_LOA = (void*)afb_context_change_loa
+ .session_set_LOA = (void*)afb_context_change_loa,
+ .subscribe = (void*)req_subscribe_unsubscribe_error,
+ .unsubscribe = (void*)req_subscribe_unsubscribe_error
};
static struct hreq_data *get_data(struct afb_hreq *hreq, const char *key, int create)
@@ -697,7 +700,7 @@ int afb_hreq_post_add_file(struct afb_hreq *hreq, const char *key, const char *f
struct afb_req afb_hreq_to_req(struct afb_hreq *hreq)
{
- return (struct afb_req){ .itf = &afb_hreq_itf, .closure = hreq };
+ return (struct afb_req){ .itf = &afb_hreq_req_itf, .closure = hreq };
}
static struct afb_arg req_get(struct afb_hreq *hreq, const char *name)
@@ -798,6 +801,12 @@ static void req_success(struct afb_hreq *hreq, json_object *obj, const char *inf
req_reply(hreq, MHD_HTTP_OK, "success", info, obj);
}
+static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event)
+{
+ errno = EINVAL;
+ return -1;
+}
+
int afb_hreq_init_context(struct afb_hreq *hreq)
{
const char *uuid;
diff --git a/src/afb-hreq.h b/src/afb-hreq.h
index 836f4703..772cd677 100644
--- a/src/afb-hreq.h
+++ b/src/afb-hreq.h
@@ -21,6 +21,9 @@ struct AFB_clientCtx;
struct json_object;
struct hreq_data;
struct afb_hsrv;
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_hreq_req_itf;
struct afb_hreq {
/*
diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c
index ffb6b81f..5ef751f1 100644
--- a/src/afb-ws-json1.c
+++ b/src/afb-ws-json1.c
@@ -32,6 +32,7 @@
#include <afb/afb-req-itf.h>
#include "afb-apis.h"
#include "afb-context.h"
+#include "afb-evt.h"
#include "verbose.h"
static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1);
@@ -50,22 +51,13 @@ struct afb_ws_json1
void (*cleanup)(void*);
void *cleanup_closure;
struct AFB_clientCtx *session;
+ struct afb_evt_listener *listener;
struct afb_wsj1 *wsj1;
int new_session;
};
static void aws_send_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object);
-static const struct afb_event_listener_itf event_listener_itf = {
- .send = (void*)aws_send_event,
- .expects = NULL
-};
-
-static inline struct afb_event_listener listener_for(struct afb_ws_json1 *aws)
-{
- return (struct afb_event_listener){ .itf = &event_listener_itf, .closure = aws };
-}
-
struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure)
{
struct afb_ws_json1 *result;
@@ -89,7 +81,8 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo
if (result->wsj1 == NULL)
goto error3;
- if (0 > ctxClientEventListenerAdd(result->session, listener_for(result)))
+ result->listener = afb_evt_listener_create((void*)aws_send_event, result);
+ if (result->listener == NULL)
goto error4;
return result;
@@ -114,7 +107,7 @@ static struct afb_ws_json1 *aws_addref(struct afb_ws_json1 *ws)
static void aws_unref(struct afb_ws_json1 *ws)
{
if (--ws->refcount == 0) {
- ctxClientEventListenerRemove(ws->session, listener_for(ws));
+ afb_evt_listener_unref(ws->listener);
afb_wsj1_unref(ws->wsj1);
if (ws->cleanup != NULL)
ws->cleanup(ws->cleanup_closure);
@@ -149,9 +142,10 @@ static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *
static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info);
static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size);
static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size);
+static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event);
+static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event);
-
-static const struct afb_req_itf wsreq_itf = {
+const struct afb_req_itf afb_ws_json1_req_itf = {
.json = (void*)wsreq_json,
.get = (void*)wsreq_get,
.success = (void*)wsreq_success,
@@ -163,7 +157,9 @@ static const struct afb_req_itf wsreq_itf = {
.addref = (void*)wsreq_addref,
.unref = (void*)wsreq_unref,
.session_close = (void*)afb_context_close,
- .session_set_LOA = (void*)afb_context_change_loa
+ .session_set_LOA = (void*)afb_context_change_loa,
+ .subscribe = (void*)wsreq_subscribe,
+ .unsubscribe = (void*)wsreq_unsubscribe
};
static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg)
@@ -197,7 +193,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve
/* emits the call */
r.closure = wsreq;
- r.itf = &wsreq_itf;
+ r.itf = &afb_ws_json1_req_itf;
afb_apis_call_(r, &wsreq->context, api, verb);
wsreq_unref(wsreq);
}
@@ -276,3 +272,13 @@ static void aws_send_event(struct afb_ws_json1 *aws, const char *event, struct j
afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
}
+static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event)
+{
+ return afb_evt_add_watch(wsreq->aws->listener, event);
+}
+
+static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event)
+{
+ return afb_evt_remove_watch(wsreq->aws->listener, event);
+}
+
diff --git a/src/afb-ws-json1.h b/src/afb-ws-json1.h
index fedbcf6c..f714c222 100644
--- a/src/afb-ws-json1.h
+++ b/src/afb-ws-json1.h
@@ -19,6 +19,9 @@
struct afb_ws_json1;
struct afb_context;
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_ws_json1_req_itf;
extern struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *closure);
diff --git a/src/session.c b/src/session.c
index 16dc8369..e0d0a8e6 100644
--- a/src/session.c
+++ b/src/session.c
@@ -38,13 +38,6 @@ struct client_value
void (*free_value)(void*);
};
-struct afb_event_listener_list
-{
- struct afb_event_listener_list *next;
- struct afb_event_listener listener;
- int refcount;
-};
-
struct AFB_clientCtx
{
unsigned refcount;
@@ -54,7 +47,6 @@ struct AFB_clientCtx
char uuid[37]; // long term authentication of remote client
char token[37]; // short term authentication of remote client
struct client_value *values;
- struct afb_event_listener_list *listeners;
};
// Session UUID are store in a simple array [for 10 sessions this should be enough]
@@ -66,7 +58,6 @@ static struct {
int timeout;
int apicount;
char initok[37];
- struct afb_event_listener_list *listeners;
} sessions;
/* generate a uuid */
@@ -289,8 +280,6 @@ void ctxClientClose (struct AFB_clientCtx *clientCtx)
if (clientCtx->uuid[0] != 0) {
clientCtx->uuid[0] = 0;
ctxUuidFreeCB (clientCtx);
- while(clientCtx->listeners != NULL)
- ctxClientEventListenerRemove(clientCtx, clientCtx->listeners->listener);
if (clientCtx->refcount == 0) {
ctxStoreDel (clientCtx);
free(clientCtx);
@@ -326,103 +315,6 @@ void ctxTokenNew (struct AFB_clientCtx *clientCtx)
clientCtx->expiration = NOW + sessions.timeout;
}
-static int add_listener(struct afb_event_listener_list **head, struct afb_event_listener listener)
-{
- struct afb_event_listener_list *iter, **prv;
-
- prv = head;
- for (;;) {
- iter = *prv;
- if (iter == NULL) {
- iter = calloc(1, sizeof *iter);
- if (iter == NULL) {
- errno = ENOMEM;
- return -1;
- }
- iter->listener = listener;
- iter->refcount = 1;
- *prv = iter;
- return 0;
- }
- if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) {
- iter->refcount++;
- return 0;
- }
- prv = &iter->next;
- }
-}
-
-int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener)
-{
- return add_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener);
-}
-
-static void remove_listener(struct afb_event_listener_list **head, struct afb_event_listener listener)
-{
- struct afb_event_listener_list *iter, **prv;
-
- prv = head;
- for (;;) {
- iter = *prv;
- if (iter == NULL)
- return;
- if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) {
- if (!--iter->refcount) {
- *prv = iter->next;
- free(iter);
- }
- return;
- }
- prv = &iter->next;
- }
-}
-
-void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener)
-{
- remove_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener);
-}
-
-static int send(struct afb_event_listener_list *head, const char *event, struct json_object *object)
-{
- struct afb_event_listener_list *iter;
- int result;
-
- result = 0;
- iter = head;
- while (iter != NULL) {
- if (iter->listener.itf->expects == NULL || iter->listener.itf->expects(iter->listener.closure, event)) {
- iter->listener.itf->send(iter->listener.closure, event, json_object_get(object));
- result++;
- }
- iter = iter->next;
- }
-
- return result;
-}
-
-int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object)
-{
- long idx;
- time_t now;
- int result;
-
- now = NOW;
- if (clientCtx != NULL) {
- result = ctxIsActive(clientCtx, now) ? send(clientCtx->listeners, event, object) : 0;
- } else {
- result = send(sessions.listeners, event, object);
- for (idx=0; idx < sessions.max; idx++) {
- clientCtx = ctxClientAddRef(sessions.store[idx]);
- if (clientCtx != NULL && ctxIsActive(clientCtx, now)) {
- clientCtx = ctxClientAddRef(clientCtx);
- result += send(clientCtx->listeners, event, object);
- }
- ctxClientUnref(clientCtx);
- }
- }
- return result;
-}
-
const char *ctxClientGetUuid (struct AFB_clientCtx *clientCtx)
{
assert(clientCtx != NULL);
diff --git a/src/session.h b/src/session.h
index af074100..497951af 100644
--- a/src/session.h
+++ b/src/session.h
@@ -20,18 +20,6 @@
struct json_object;
struct AFB_clientCtx;
-struct afb_event_listener_itf
-{
- void (*send)(void *closure, const char *event, struct json_object *object);
- int (*expects)(void *closure, const char *event);
-};
-
-struct afb_event_listener
-{
- const struct afb_event_listener_itf *itf;
- void *closure;
-};
-
extern void ctxStoreInit (int max_session_count, int timeout, const char *initok, int context_count);
extern struct AFB_clientCtx *ctxClientGetSession (const char *uuid, int *created);
@@ -39,10 +27,6 @@ extern struct AFB_clientCtx *ctxClientAddRef(struct AFB_clientCtx *clientCtx);
extern void ctxClientUnref(struct AFB_clientCtx *clientCtx);
extern void ctxClientClose (struct AFB_clientCtx *clientCtx);
-extern int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener);
-extern void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener);
-extern int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object);
-
extern int ctxTokenCheck (struct AFB_clientCtx *clientCtx, const char *token);
extern void ctxTokenNew (struct AFB_clientCtx *clientCtx);