summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2016-06-23 11:04:00 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2016-06-23 11:04:23 +0200
commit39c2ebc125fcc694ac349ae196b62729c7f05037 (patch)
treed10938464f2f4bd94f28aa7c1d122504c53fd025
parentd8ef25780bffa6f91f013ef71b1ede908325e59d (diff)
api-dbus: improves events
Change-Id: I0d58bed66ebc9eaea63c0863351d03cf458e4198 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--plugins/samples/HelloWorld.c151
-rw-r--r--src/afb-api-dbus.c575
2 files changed, 668 insertions, 58 deletions
diff --git a/plugins/samples/HelloWorld.c b/plugins/samples/HelloWorld.c
index 259b42f1..b6f49b78 100644
--- a/plugins/samples/HelloWorld.c
+++ b/plugins/samples/HelloWorld.c
@@ -23,6 +23,89 @@
const struct AFB_interface *interface;
+struct event
+{
+ struct event *next;
+ struct afb_event event;
+ char tag[1];
+};
+
+static struct event *events = 0;
+
+/* searchs the event of tag */
+static struct event *event_get(const char *tag)
+{
+ struct event *e = events;
+ while(e && strcmp(e->tag, tag))
+ e = e->next;
+ return e;
+}
+
+/* deletes the event of tag */
+static int event_del(const char *tag)
+{
+ struct event *e, **p;
+
+ /* check exists */
+ e = event_get(tag);
+ if (!e) return -1;
+
+ /* unlink */
+ p = &events;
+ while(*p != e) p = &(*p)->next;
+ *p = e->next;
+
+ /* destroys */
+ afb_event_drop(e->event);
+ free(e);
+ return 0;
+}
+
+/* creates the event of tag */
+static int event_add(const char *tag, const char *name)
+{
+ struct event *e;
+
+ /* check valid tag */
+ e = event_get(tag);
+ if (e) return -1;
+
+ /* creation */
+ e = malloc(strlen(tag) + sizeof *e);
+ if (!e) return -1;
+ strcpy(e->tag, tag);
+
+ /* make the event */
+ e->event = afb_daemon_make_event(interface->daemon, name);
+ if (!e->event.closure) { free(e); return -1; }
+
+ /* link */
+ e->next = events;
+ events = e;
+ return 0;
+}
+
+static int event_subscribe(struct afb_req request, const char *tag)
+{
+ struct event *e;
+ e = event_get(tag);
+ return e ? afb_req_subscribe(request, e->event) : -1;
+}
+
+static int event_unsubscribe(struct afb_req request, const char *tag)
+{
+ struct event *e;
+ e = event_get(tag);
+ return e ? afb_req_unsubscribe(request, e->event) : -1;
+}
+
+static int event_push(struct json_object *args, const char *tag)
+{
+ struct event *e;
+ e = event_get(tag);
+ return e ? afb_event_push(e->event, json_object_get(args)) : -1;
+}
+
// Sample Generic Ping Debug API
static void ping(struct afb_req request, json_object *jresp, const char *tag)
{
@@ -99,6 +182,69 @@ static void subcall (struct afb_req request)
afb_req_subcall(request, api, verb, object, subcallcb, afb_req_store(request));
}
+static void eventadd (struct afb_req request)
+{
+ const char *tag = afb_req_value(request, "tag");
+ const char *name = afb_req_value(request, "name");
+
+ if (tag == NULL || name == NULL)
+ afb_req_fail(request, "failed", "bad arguments");
+ else if (0 != event_add(tag, name))
+ afb_req_fail(request, "failed", "creation error");
+ else
+ afb_req_success(request, NULL, NULL);
+}
+
+static void eventdel (struct afb_req request)
+{
+ const char *tag = afb_req_value(request, "tag");
+
+ if (tag == NULL)
+ afb_req_fail(request, "failed", "bad arguments");
+ else if (0 != event_del(tag))
+ afb_req_fail(request, "failed", "deletion error");
+ else
+ afb_req_success(request, NULL, NULL);
+}
+
+static void eventsub (struct afb_req request)
+{
+ const char *tag = afb_req_value(request, "tag");
+
+ if (tag == NULL)
+ afb_req_fail(request, "failed", "bad arguments");
+ else if (0 != event_subscribe(request, tag))
+ afb_req_fail(request, "failed", "subscription error");
+ else
+ afb_req_success(request, NULL, NULL);
+}
+
+static void eventunsub (struct afb_req request)
+{
+ const char *tag = afb_req_value(request, "tag");
+
+ if (tag == NULL)
+ afb_req_fail(request, "failed", "bad arguments");
+ else if (0 != event_unsubscribe(request, tag))
+ afb_req_fail(request, "failed", "unsubscription error");
+ else
+ afb_req_success(request, NULL, NULL);
+}
+
+static void eventpush (struct afb_req request)
+{
+ const char *tag = afb_req_value(request, "tag");
+ const char *data = afb_req_value(request, "data");
+ json_object *object = data ? json_tokener_parse(data) : NULL;
+
+ if (tag == NULL)
+ afb_req_fail(request, "failed", "bad arguments");
+ else if (0 > event_push(object, tag))
+ afb_req_fail(request, "failed", "push error");
+ else
+ afb_req_success(request, NULL, NULL);
+}
+
// NOTE: this sample does not use session to keep test a basic as possible
// in real application most APIs should be protected with AFB_SESSION_CHECK
static const struct AFB_verb_desc_v1 verbs[]= {
@@ -109,6 +255,11 @@ static const struct AFB_verb_desc_v1 verbs[]= {
{"pingJson" , AFB_SESSION_NONE, pingJson , "Return a JSON object"},
{"pingevent", AFB_SESSION_NONE, pingEvent , "Send an event"},
{"subcall", AFB_SESSION_NONE, subcall , "Call api/verb(args)"},
+ {"eventadd", AFB_SESSION_NONE, eventadd , "adds the event of 'name' for the 'tag'"},
+ {"eventdel", AFB_SESSION_NONE, eventdel , "deletes the event of 'tag'"},
+ {"eventsub", AFB_SESSION_NONE, eventsub , "subscribes to the event of 'tag'"},
+ {"eventunsub",AFB_SESSION_NONE, eventunsub , "unsubscribes to the event of 'tag'"},
+ {"eventpush", AFB_SESSION_NONE, eventpush , "pushs the event of 'tag' with the 'data'"},
{NULL}
};
diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c
index 054755ac..4c5d9083 100644
--- a/src/afb-api-dbus.c
+++ b/src/afb-api-dbus.c
@@ -42,6 +42,10 @@
static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
+struct dbus_memo;
+struct dbus_event;
+struct destination;
+
/*
* The path given are of the form
* system:/org/agl/afb/api/...
@@ -51,11 +55,22 @@ static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
struct api_dbus
{
struct sd_bus *sdbus; /* the bus */
- struct sd_bus_slot *slot; /* the slot */
char *path; /* path of the object for the API */
char *name; /* name/interface of the object */
char *api; /* api name of the interface */
- struct afb_evt_listener *listener;
+ union {
+ struct {
+ struct sd_bus_slot *slot_broadcast;
+ struct sd_bus_slot *slot_event;
+ struct dbus_event *events;
+ struct dbus_memo *memos;
+ } client;
+ struct {
+ struct sd_bus_slot *slot_call;
+ struct afb_evt_listener *listener; /* listener for broadcasted events */
+ struct destination *destinations;
+ } server;
+ };
};
#define RETOK 1
@@ -200,12 +215,23 @@ static void destroy_api_dbus(struct api_dbus *api)
* structure for recording query data
*/
struct dbus_memo {
+ struct dbus_memo *next; /* the next memo */
+ struct api_dbus *api; /* the dbus api */
struct afb_req req; /* the request handle */
struct afb_context *context; /* the context of the query */
+ uint64_t msgid; /* the message identifier */
+};
+
+struct dbus_event
+{
+ struct dbus_event *next;
+ struct afb_event event;
+ int id;
+ int refcount;
};
/* allocates and init the memorizing data */
-static struct dbus_memo *api_dbus_client_make_memo(struct afb_req req, struct afb_context *context)
+static struct dbus_memo *api_dbus_client_memo_make(struct api_dbus *api, struct afb_req req, struct afb_context *context)
{
struct dbus_memo *memo;
@@ -214,17 +240,44 @@ static struct dbus_memo *api_dbus_client_make_memo(struct afb_req req, struct af
afb_req_addref(req);
memo->req = req;
memo->context = context;
+ memo->msgid = 0;
+ memo->api = api;
+ memo->next = api->client.memos;
+ api->client.memos = memo;
}
return memo;
}
/* free and release the memorizing data */
-static void api_dbus_client_free_memo(struct dbus_memo *memo)
+static void api_dbus_client_memo_destroy(struct dbus_memo *memo)
{
+ struct dbus_memo **prv;
+
+ prv = &memo->api->client.memos;
+ while (*prv != NULL) {
+ if (*prv == memo) {
+ *prv = memo->next;
+ break;
+ }
+ prv = &(*prv)->next;
+ }
+
afb_req_unref(memo->req);
free(memo);
}
+/* search a memorized request */
+static struct dbus_memo *api_dbus_client_memo_search(struct api_dbus *api, uint64_t msgid)
+{
+ struct dbus_memo *memo;
+
+ memo = api->client.memos;
+ while (memo != NULL && memo->msgid != msgid)
+ memo = memo->next;
+
+ return memo;
+}
+
/* callback when received answer */
static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_bus_error *ret_error)
{
@@ -247,10 +300,10 @@ static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_
memo->context->flags = (unsigned)flags;
switch(type) {
case RETOK:
- afb_req_success(memo->req, json_tokener_parse(first), second);
+ afb_req_success(memo->req, json_tokener_parse(first), *second ? second : NULL);
break;
case RETERR:
- afb_req_fail(memo->req, first, second);
+ afb_req_fail(memo->req, first, *second ? second : NULL);
break;
case RETRAW:
afb_req_send(memo->req, first, strlen(first));
@@ -260,7 +313,7 @@ static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_
break;
}
}
- api_dbus_client_free_memo(memo);
+ api_dbus_client_memo_destroy(memo);
return 1;
}
@@ -271,29 +324,44 @@ static void api_dbus_client_call(struct api_dbus *api, struct afb_req req, struc
int rc;
char *method = strndupa(verb, lenverb);
struct dbus_memo *memo;
+ struct sd_bus_message *msg;
/* create the recording data */
- memo = api_dbus_client_make_memo(req, context);
+ memo = api_dbus_client_memo_make(api, req, context);
if (memo == NULL) {
afb_req_fail(req, "error", "out of memory");
return;
}
- /* makes the call */
- rc = sd_bus_call_method_async(api->sdbus, NULL,
- api->name, api->path, api->name, method,
- api_dbus_client_on_reply, memo,
- "ssu",
+ /* creates the message */
+ msg = NULL;
+ rc = sd_bus_message_new_method_call(api->sdbus, &msg, api->name, api->path, api->name, method);
+ if (rc < 0)
+ goto error;
+
+ rc = sd_bus_message_append(msg, "ssu",
afb_req_raw(req, &size),
ctxClientGetUuid(context->session),
(uint32_t)context->flags);
+ if (rc < 0)
+ goto error;
+
+ /* makes the call */
+ rc = sd_bus_call_async(api->sdbus, NULL, msg, api_dbus_client_on_reply, memo, (uint64_t)-1);
+ if (rc < 0)
+ goto error;
+
+ rc = sd_bus_message_get_cookie(msg, &memo->msgid);
+ if (rc >= 0)
+ goto end;
+error:
/* if there was an error report it directly */
- if (rc < 0) {
- errno = -rc;
- afb_req_fail(req, "error", "dbus error");
- api_dbus_client_free_memo(memo);
- }
+ errno = -rc;
+ afb_req_fail(req, "error", "dbus error");
+ api_dbus_client_memo_destroy(memo);
+end:
+ sd_bus_message_unref(msg);
}
static int api_dbus_service_start(struct api_dbus *api, int share_session, int onneed)
@@ -307,18 +375,208 @@ static int api_dbus_service_start(struct api_dbus *api, int share_session, int o
return -1;
}
-/* receives events */
-static int api_dbus_client_on_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
+/* receives broadcasted events */
+static int api_dbus_client_on_broadcast_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
{
struct json_object *object;
const char *event, *data;
int rc = sd_bus_message_read(m, "ss", &event, &data);
if (rc < 0)
- ERROR("unreadable event");
+ ERROR("unreadable broadcasted event");
else {
object = json_tokener_parse(data);
afb_evt_broadcast(event, object);
- json_object_put(object);
+ }
+ return 1;
+}
+
+/* search the event */
+static struct dbus_event *api_dbus_client_event_search(struct api_dbus *api, int id, const char *name)
+{
+ struct dbus_event *ev;
+
+ ev = api->client.events;
+ while (ev != NULL && (ev->id != id || 0 != strcmp(afb_evt_event_name(ev->event), name)))
+ ev = ev->next;
+
+ return ev;
+}
+
+/* adds an event */
+static void api_dbus_client_event_create(struct api_dbus *api, int id, const char *name)
+{
+ struct dbus_event *ev;
+
+ /* check conflicts */
+ ev = api_dbus_client_event_search(api, id, name);
+ if (ev != NULL) {
+ ev->refcount++;
+ return;
+ }
+
+ /* no conflict, try to add it */
+ ev = malloc(sizeof *ev);
+ if (ev != NULL) {
+ ev->event = afb_evt_create_event(name);
+ if (ev->event.closure == NULL)
+ free(ev);
+ else {
+ ev->refcount = 1;
+ ev->id = id;
+ ev->next = api->client.events;
+ api->client.events = ev;
+ return;
+ }
+ }
+ ERROR("can't create event %s, out of memory", name);
+}
+
+/* removes an event */
+static void api_dbus_client_event_drop(struct api_dbus *api, int id, const char *name)
+{
+ struct dbus_event *ev, **prv;
+
+ /* retrieves the event */
+ ev = api_dbus_client_event_search(api, id, name);
+ if (ev == NULL) {
+ ERROR("event %s not found", name);
+ return;
+ }
+
+ /* decrease the reference count */
+ if (--ev->refcount)
+ return;
+
+ /* unlinks the event */
+ prv = &api->client.events;
+ while (*prv != ev)
+ prv = &(*prv)->next;
+ *prv = ev->next;
+
+ /* destroys the event */
+ afb_event_drop(ev->event);
+ free(ev);
+}
+
+/* pushs an event */
+static void api_dbus_client_event_push(struct api_dbus *api, int id, const char *name, const char *data)
+{
+ struct json_object *object;
+ struct dbus_event *ev;
+
+ /* retrieves the event */
+ ev = api_dbus_client_event_search(api, id, name);
+ if (ev == NULL) {
+ ERROR("event %s not found", name);
+ return;
+ }
+
+ /* destroys the event */
+ object = json_tokener_parse(data);
+ afb_event_push(ev->event, object);
+}
+
+/* subscribes an event */
+static void api_dbus_client_event_subscribe(struct api_dbus *api, int id, const char *name, uint64_t msgid)
+{
+ int rc;
+ struct dbus_event *ev;
+ struct dbus_memo *memo;
+
+ /* retrieves the event */
+ ev = api_dbus_client_event_search(api, id, name);
+ if (ev == NULL) {
+ ERROR("event %s not found", name);
+ return;
+ }
+
+ /* retrieves the memo */
+ memo = api_dbus_client_memo_search(api, msgid);
+ if (memo == NULL) {
+ ERROR("message not found");
+ return;
+ }
+
+ /* subscribe the request to the event */
+ rc = afb_req_subscribe(memo->req, ev->event);
+ if (rc < 0)
+ ERROR("can't subscribe: %m");
+}
+
+/* unsubscribes an event */
+static void api_dbus_client_event_unsubscribe(struct api_dbus *api, int id, const char *name, uint64_t msgid)
+{
+ int rc;
+ struct dbus_event *ev;
+ struct dbus_memo *memo;
+
+ /* retrieves the event */
+ ev = api_dbus_client_event_search(api, id, name);
+ if (ev == NULL) {
+ ERROR("event %s not found", name);
+ return;
+ }
+
+ /* retrieves the memo */
+ memo = api_dbus_client_memo_search(api, msgid);
+ if (memo == NULL) {
+ ERROR("message not found");
+ return;
+ }
+
+ /* unsubscribe the request from the event */
+ rc = afb_req_unsubscribe(memo->req, ev->event);
+ if (rc < 0)
+ ERROR("can't unsubscribe: %m");
+}
+
+/* receives calls for event */
+static int api_dbus_client_on_manage_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
+{
+ const char *eventname, *data;
+ int rc;
+ int32_t eventid;
+ uint8_t order;
+ struct api_dbus *api;
+ uint64_t msgid;
+
+ /* check if expected message */
+ api = userdata;
+ if (0 != strcmp(api->name, sd_bus_message_get_interface(m)))
+ return 0; /* not the expected interface */
+ if (0 != strcmp("event", sd_bus_message_get_member(m)))
+ return 0; /* not the expected member */
+ if (sd_bus_message_get_expect_reply(m))
+ return 0; /* not the expected type of message */
+
+ /* reads the message */
+ rc = sd_bus_message_read(m, "yisst", &order, &eventid, &eventname, &data, &msgid);
+ if (rc < 0) {
+ ERROR("unreadable event");
+ return 1;
+ }
+
+ /* what is the order ? */
+ switch ((char)order) {
+ case '+': /* creates the event */
+ api_dbus_client_event_create(api, eventid, eventname);
+ break;
+ case '-': /* drops the event */
+ api_dbus_client_event_drop(api, eventid, eventname);
+ break;
+ case '!': /* pushs the event */
+ api_dbus_client_event_push(api, eventid, eventname, data);
+ break;
+ case 'S': /* subscribe event for a request */
+ api_dbus_client_event_subscribe(api, eventid, eventname, msgid);
+ break;
+ case 'U': /* unsubscribe event for a request */
+ api_dbus_client_event_unsubscribe(api, eventid, eventname, msgid);
+ break;
+ default:
+ /* unexpected order */
+ ERROR("unexpected order '%c' received", (char)order);
+ break;
}
return 1;
}
@@ -336,17 +594,25 @@ int afb_api_dbus_add_client(const char *path)
if (api == NULL)
goto error;
- /* connect to events */
- rc = asprintf(&match, "type='signal',path='%s',interface='%s',member='event'", api->path, api->name);
+ /* connect to broadcasted events */
+ rc = asprintf(&match, "type='signal',path='%s',interface='%s',member='broadcast'", api->path, api->name);
if (rc < 0) {
errno = ENOMEM;
ERROR("out of memory");
goto error;
}
- rc = sd_bus_add_match(api->sdbus, &api->slot, match, api_dbus_client_on_event, api);
+ rc = sd_bus_add_match(api->sdbus, &api->client.slot_broadcast, match, api_dbus_client_on_broadcast_event, api);
free(match);
if (rc < 0) {
errno = -rc;
+ ERROR("can't add dbus match %s for %s", api->path, api->name);
+ goto error;
+ }
+
+ /* connect to event management */
+ rc = sd_bus_add_object(api->sdbus, &api->client.slot_event, api->path, api_dbus_client_on_manage_event, api);
+ if (rc < 0) {
+ errno = -rc;
ERROR("can't add dbus object %s for %s", api->path, api->name);
goto error;
}
@@ -366,6 +632,137 @@ error:
return -1;
}
+/******************* event structures for server part **********************************/
+
+static void afb_api_dbus_server_event_add(void *closure, const char *event, int eventid);
+static void afb_api_dbus_server_event_remove(void *closure, const char *event, int eventid);
+static void afb_api_dbus_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
+
+/* the interface for events broadcasting */
+static const struct afb_evt_itf evt_broadcast_itf = {
+ .broadcast = afb_api_dbus_server_event_broadcast,
+};
+
+/* the interface for events pushing */
+static const struct afb_evt_itf evt_push_itf = {
+ .push = afb_api_dbus_server_event_push,
+ .add = afb_api_dbus_server_event_add,
+ .remove = afb_api_dbus_server_event_remove
+};
+
+/******************* destination description part for server *****************************/
+
+struct destination
+{
+ /* link to next different destination */
+ struct destination *next;
+
+ /* the server dbus-api */
+ struct api_dbus *api;
+
+ /* count of references */
+ int refcount;
+
+ /* the destination */
+ char name[1];
+};
+
+static struct destination *afb_api_dbus_server_destination_get(struct api_dbus *api, const char *sender)
+{
+ struct destination *destination;
+
+ /* searchs for an existing destination */
+ destination = api->server.destinations;
+ while (destination != NULL) {
+ if (0 == strcmp(destination->name, sender)) {
+ destination->refcount++;
+ return destination;
+ }
+ destination = destination->next;
+ }
+
+ /* not found, create it */
+ destination = malloc(strlen(sender) + sizeof *destination);
+ if (destination == NULL)
+ errno = ENOMEM;
+ else {
+ destination->api = api;
+ destination->refcount = 1;
+ strcpy(destination->name, sender);
+ destination->next = api->server.destinations;
+ api->server.destinations = destination;
+ }
+ return destination;
+}
+
+static void afb_api_dbus_server_destination_unref(struct destination *destination)
+{
+ if (!--destination->refcount) {
+ struct destination **prv;
+
+ prv = &destination->api->server.destinations;
+ while(*prv != destination)
+ prv = &(*prv)->next;
+ *prv = destination->next;
+ free(destination);
+ }
+}
+
+struct listener
+{
+ /* link to next different destination */
+ struct destination *destination;
+
+ /* the listener of events */
+ struct afb_evt_listener *listener;
+};
+
+static void afb_api_dbus_server_listener_free(struct listener *listener)
+{
+ afb_evt_listener_unref(listener->listener);
+ afb_api_dbus_server_destination_unref(listener->destination);
+ free(listener);
+}
+
+static struct listener *afb_api_dbus_server_listerner_get(struct api_dbus *api, const char *sender, struct AFB_clientCtx *session)
+{
+ int rc;
+ struct listener *listener;
+ struct destination *destination;
+
+ /* get the destination */
+ destination = afb_api_dbus_server_destination_get(api, sender);
+ if (destination == NULL)
+ return NULL;
+
+ /* retrieves the stored listener */
+ listener = ctxClientCookieGet(session, destination);
+ if (listener != NULL) {
+ /* found */
+ afb_api_dbus_server_destination_unref(destination);
+ return listener;
+ }
+
+ /* creates the listener */
+ listener = malloc(sizeof *listener);
+ if (listener == NULL)
+ errno = ENOMEM;
+ else {
+ listener->destination = destination;
+ listener->listener = afb_evt_listener_create(&evt_push_itf, destination);
+ if (listener->listener != NULL) {
+ rc = ctxClientCookieSet(session, destination, listener, (void*)afb_api_dbus_server_listener_free);
+ if (rc == 0)
+ return listener;
+ afb_evt_listener_unref(listener->listener);
+ }
+ free(listener);
+ }
+ afb_api_dbus_server_destination_unref(destination);
+ return NULL;
+}
+
/******************* dbus request part for server *****************/
/*
@@ -376,6 +773,7 @@ struct dbus_req {
sd_bus_message *message; /* the incoming request message */
const char *request; /* the readen request as string */
struct json_object *json; /* the readen request as object */
+ struct listener *listener; /* the listener for events */
int refcount; /* reference count of the request */
};
@@ -420,7 +818,7 @@ static void dbus_req_reply(struct dbus_req *dreq, uint8_t type, const char *firs
{
int rc;
rc = sd_bus_reply_method_return(dreq->message,
- "yssu", type, first, second, (uint32_t)dreq->context.flags);
+ "yssu", type, first ? : "", second ? : "", (uint32_t)dreq->context.flags);
if (rc < 0)
ERROR("sending the reply failed");
}
@@ -448,14 +846,28 @@ static void dbus_req_send(struct dbus_req *dreq, const char *buffer, size_t size
dbus_req_reply(dreq, RETRAW, buffer, "");
}
+static void afb_api_dbus_server_event_send(struct destination *destination, char order, const char *event, int eventid, const char *data, uint64_t msgid);
+
static int dbus_req_subscribe(struct dbus_req *dreq, struct afb_event event)
{
- return -1;
+ uint64_t msgid;
+ int rc;
+
+ rc = afb_evt_add_watch(dreq->listener->listener, event);
+ sd_bus_message_get_cookie(dreq->message, &msgid);
+ afb_api_dbus_server_event_send(dreq->listener->destination, 'S', afb_evt_event_name(event), afb_evt_event_id(event), "", msgid);
+ return rc;
}
static int dbus_req_unsubscribe(struct dbus_req *dreq, struct afb_event event)
{
- return -1;
+ uint64_t msgid;
+ int rc;
+
+ sd_bus_message_get_cookie(dreq->message, &msgid);
+ afb_api_dbus_server_event_send(dreq->listener->destination, 'U', afb_evt_event_name(event), afb_evt_event_id(event), "", msgid);
+ rc = afb_evt_remove_watch(dreq->listener->listener, event);
+ return rc;
}
static void dbus_req_subcall(struct dbus_req *dreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure);
@@ -485,6 +897,62 @@ static void dbus_req_subcall(struct dbus_req *dreq, const char *api, const char
/******************* server part **********************************/
+static void afb_api_dbus_server_event_send(struct destination *destination, char order, const char *event, int eventid, const char *data, uint64_t msgid)
+{
+ int rc;
+ struct api_dbus *api;
+ struct sd_bus_message *msg;
+
+ api = destination->api;
+ msg = NULL;
+
+ rc = sd_bus_message_new_method_call(api->sdbus, &msg, destination->name, api->path, api->name, "event");
+ if (rc < 0)
+ goto error;
+
+ rc = sd_bus_message_append(msg, "yisst", (uint8_t)order, (int32_t)eventid, event, data, msgid);
+ if (rc < 0)
+ goto error;
+
+ rc = sd_bus_send(api->sdbus, msg, NULL); /* NULL for cookie implies no expected reply */
+ if (rc >= 0)
+ goto end;
+
+error:
+ ERROR("error while send event %c%s(%d) to %s", order, event, eventid, destination->name);
+end:
+ sd_bus_message_unref(msg);
+}
+
+static void afb_api_dbus_server_event_add(void *closure, const char *event, int eventid)
+{
+ afb_api_dbus_server_event_send(closure, '+', event, eventid, "", 0);
+}
+
+static void afb_api_dbus_server_event_remove(void *closure, const char *event, int eventid)
+{
+ afb_api_dbus_server_event_send(closure, '-', event, eventid, "", 0);
+}
+
+static void afb_api_dbus_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
+{
+ const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
+ afb_api_dbus_server_event_send(closure, '!', event, eventid, data, 0);
+}
+
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
+{
+ int rc;
+ struct api_dbus *api;
+
+ api = closure;
+ rc = sd_bus_emit_signal(api->sdbus, api->path, api->name, "broadcast",
+ "ss", event, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN));
+ if (rc < 0)
+ ERROR("error while broadcasting event %s", event);
+ json_object_put(object);
+}
+
/* called when the object for the service is called */
static int api_dbus_server_on_object_called(sd_bus_message *message, void *userdata, sd_bus_error *ret_error)
{
@@ -495,6 +963,8 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
struct api_dbus *api = userdata;
struct afb_req areq;
uint32_t flags;
+ struct AFB_clientCtx *session;
+ struct listener *listener;
/* check the interface */
if (strcmp(sd_bus_message_get_interface(message), api->name) != 0)
@@ -505,55 +975,45 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
/* create the request */
dreq = calloc(1 , sizeof *dreq);
- if (dreq == NULL) {
- sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
- return 1;
- }
+ if (dreq == NULL)
+ goto out_of_memory;
/* get the data */
rc = sd_bus_message_read(message, "ssu", &dreq->request, &uuid, &flags);
if (rc < 0) {
sd_bus_reply_method_errorf(message, SD_BUS_ERROR_INVALID_SIGNATURE, "invalid signature");
- free(dreq);
- return 1;
+ goto error;
}
/* connect to the context */
- if (afb_context_connect(&dreq->context, uuid, NULL) < 0) {
- sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
- free(dreq);
- return 1;
- }
+ if (afb_context_connect(&dreq->context, uuid, NULL) < 0)
+ goto out_of_memory;
+ session = dreq->context.session;
+
+ /* get the listener */
+ listener = afb_api_dbus_server_listerner_get(api, sd_bus_message_get_sender(message), session);
+ if (listener == NULL)
+ goto out_of_memory;
/* fulfill the request and emit it */
dreq->context.flags = flags;
dreq->message = sd_bus_message_ref(message);
dreq->json = NULL;
+ dreq->listener = listener;
dreq->refcount = 1;
areq.itf = &afb_api_dbus_req_itf;
areq.closure = dreq;
afb_apis_call_(areq, &dreq->context, api->api, method);
dbus_req_unref(dreq);
return 1;
-}
-static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, int eventid, struct json_object *object)
-{
- int rc;
-
- rc = sd_bus_emit_signal(api->sdbus, api->path, api->name,
- "event", "ss", event, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN));
- if (rc < 0)
- ERROR("error while emiting event %s", event);
- json_object_put(object);
+out_of_memory:
+ sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
+error:
+ free(dreq);
+ return 1;
}
-/* the interface for events */
-static const struct afb_evt_itf evt_itf = {
- .broadcast = (void*)afb_api_dbus_server_send_event,
- .push = (void*)afb_api_dbus_server_send_event
-};
-
/* create the service */
int afb_api_dbus_add_server(const char *path)
{
@@ -574,7 +1034,7 @@ int afb_api_dbus_add_server(const char *path)
}
/* connect the service to the dbus object */
- rc = sd_bus_add_object(api->sdbus, &api->slot, api->path, api_dbus_server_on_object_called, api);
+ rc = sd_bus_add_object(api->sdbus, &api->server.slot_call, api->path, api_dbus_server_on_object_called, api);
if (rc < 0) {
errno = -rc;
ERROR("can't add dbus object %s for %s", api->path, api->name);
@@ -582,8 +1042,7 @@ int afb_api_dbus_add_server(const char *path)
}
INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
- api->listener = afb_evt_listener_create(&evt_itf, api);
-
+ api->server.listener = afb_evt_listener_create(&evt_broadcast_itf, api);
return 0;
error3:
sd_bus_release_name(api->sdbus, api->name);