summaryrefslogtreecommitdiffstats
path: root/m4a_afb_comm.c
diff options
context:
space:
mode:
Diffstat (limited to 'm4a_afb_comm.c')
-rw-r--r--m4a_afb_comm.c274
1 files changed, 119 insertions, 155 deletions
diff --git a/m4a_afb_comm.c b/m4a_afb_comm.c
index 63b8d61..30e1521 100644
--- a/m4a_afb_comm.c
+++ b/m4a_afb_comm.c
@@ -24,6 +24,8 @@
#include "m4a_afb_comm.h"
+#define _GNU_SOURCE
+#include <stdio.h>
#include <errno.h>
#include <unistd.h>
@@ -33,40 +35,30 @@
#include <pulse/xmalloc.h>
#include <systemd/sd-event.h>
-#include <afb/afb-wsj1.h>
+#include <afb/afb-proto-ws.h>
#include <afb/afb-ws-client.h>
-typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data;
-typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data;
-
-enum m4a_ws_comm_pipe_opcode {
+enum m4a_internal_pipe_opcode {
OPCODE_CALL_ASYNC = 1,
OPCODE_EXIT
};
-struct _m4a_ws_comm_pipe_data {
- enum m4a_ws_comm_pipe_opcode opcode;
- void *data;
-};
-
-struct _m4a_afb_comm {
- struct afb_wsj1 *wsj1;
-
- pa_thread *thread;
- sd_event *loop;
- int pipe[2];
- sd_event_source *source;
+typedef struct _m4a_internal_pipe_data m4a_internal_pipe_data;
+typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data;
- pa_mutex *pending_calls_lock;
- PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls);
+/* structure sent over the internal pipe
+ * for communication between the two threads */
+struct _m4a_internal_pipe_data {
+ enum m4a_internal_pipe_opcode opcode;
+ void *data;
};
+/* structure holding data related to an ongoing AFB API call */
struct _m4a_afb_api_call_data {
m4a_afb_comm *comm;
- const char *api;
const char *verb;
- char *object;
+ json_object *object;
m4a_afb_done_cb_t done_cb;
void *userdata;
@@ -74,9 +66,30 @@ struct _m4a_afb_api_call_data {
PA_LLIST_FIELDS(m4a_afb_api_call_data);
};
-static void afb_api_call_data_free(m4a_afb_api_call_data *c) {
+/* the main structure of this class */
+struct _m4a_afb_comm {
+ struct afb_proto_ws *afbhandle;
+
+ char *sessid;
+
+ /* the thread that runs the loop below */
+ pa_thread *thread;
+ /* our internal event loop */
+ sd_event *loop;
+ /* the pipe fd pair used to send commands from
+ * the pulse threads to our internal thread */
+ int tx_pipe[2];
+ /* the source for listening to the above pipe from our event loop */
+ sd_event_source *tx_source;
+
+ /* a list of pending API calls, protected with a mutex */
+ pa_mutex *pending_calls_lock;
+ PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls);
+};
+
+static void m4a_afb_api_call_data_free(m4a_afb_api_call_data *c) {
if (c->object)
- pa_xfree(c->object);
+ json_object_put(c->object);
pa_mutex_lock(c->comm->pending_calls_lock);
PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c);
@@ -85,101 +98,30 @@ static void afb_api_call_data_free(m4a_afb_api_call_data *c) {
pa_xfree(c);
}
-static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) {
- m4a_afb_api_call_data *c = cb_data;
- pa_json_object *j;
- const pa_json_object *jr = NULL;
-
- j = pa_json_parse(afb_wsj1_msg_object_s(msg));
- if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) {
- pa_log_warn("4a reply is not a json object");
- } else if (afb_wsj1_msg_is_reply_ok(msg)) {
- jr = pa_json_object_get_object_member(j, "response");
- } else {
- jr = pa_json_object_get_object_member(j, "request");
- }
-
- if (afb_wsj1_msg_is_reply_ok(msg)) {
- pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb);
- c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata);
- } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) {
- const pa_json_object *status, *info;
- const char *status_str, *info_str;
+static void m4a_afb_on_reply(void *ctx, void *handle, json_object *response,
+ const char *error, const char *info) {
+ m4a_afb_api_call_data *c = handle;
- status = pa_json_object_get_object_member(jr, "status");
- info = pa_json_object_get_object_member(jr, "info");
-
- if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING)
- status_str = pa_json_object_get_string(status);
- else
- status_str = "(null)";
-
- if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING)
- info_str = pa_json_object_get_string(info);
- else
- info_str = "(null)";
-
- pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str);
- c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata);
- } else {
- pa_log("Got error reply from 4a");
- c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
- }
-
- pa_json_object_free(j);
- afb_api_call_data_free(c);
-}
-
-static void afb_call_async_step2(m4a_afb_api_call_data *c) {
- pa_log_debug("calling afb: %s/%s", c->api, c->verb);
-
- if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) {
- pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno));
- c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
- afb_api_call_data_free(c);
- }
-}
-
-static void discard_pending_calls(m4a_afb_comm *comm) {
- m4a_afb_api_call_data *c;
-
- pa_mutex_lock(comm->pending_calls_lock);
- while ((c = comm->pending_calls)) {
+ if (error) {
+ pa_log("Got error reply from 4a: %s", error);
c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
- afb_api_call_data_free(c);
+ } else {
+ pa_log_debug("Got OK reply from 4a in call to %s", c->verb);
+ c->done_cb(M4A_AFB_REPLY_OK, response, c->userdata);
}
- pa_mutex_unlock(comm->pending_calls_lock);
-}
-
-static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) {
- m4a_afb_comm *comm = closure;
-
- pa_log_warn("afb closed the communication websocket!");
- discard_pending_calls(comm);
-
- //TODO: attempt to re-connect
-}
-static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) {
- /* we don't implement any method calls */
- afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL);
+ m4a_afb_api_call_data_free(c);
}
-static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) {
- //TODO dispatch events
-}
-
-static struct afb_wsj1_itf wsj1_itf = {
- .on_hangup = on_wsj1_hangup,
- .on_call = on_wsj1_call,
- .on_event = on_wsj1_event
+static struct afb_proto_ws_client_itf itf = {
+ .on_reply = m4a_afb_on_reply
};
-static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) {
+static int internal_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) {
ssize_t r;
- m4a_ws_comm_pipe_data d = { 0 };
+ m4a_internal_pipe_data d = { 0 };
- if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
+ if ((r = read(fd, &d, sizeof(m4a_internal_pipe_data))) < (ssize_t) sizeof(m4a_internal_pipe_data)) {
if (errno == EINTR)
return 0;
@@ -190,7 +132,14 @@ static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4
switch(d.opcode) {
case OPCODE_CALL_ASYNC: {
m4a_afb_api_call_data *c = d.data;
- afb_call_async_step2(c);
+
+ if (afb_proto_ws_client_call(c->comm->afbhandle, c->verb, c->object,
+ c->comm->sessid, c, NULL) < 0) {
+ pa_log("afb_proto_ws_client_call: failed to call %s: %s",
+ c->verb, strerror(errno));
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ m4a_afb_api_call_data_free(c);
+ }
break;
}
case OPCODE_EXIT:
@@ -203,24 +152,24 @@ static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4
return 0;
}
-static void ws_comm_thread(m4a_afb_comm *comm) {
- pa_log_debug("websocket thread starting...");
+static void internal_thread(m4a_afb_comm *comm) {
+ pa_log_debug("afb comm thread starting...");
sd_event_loop(comm->loop);
- pa_log_debug("websocket thread exiting...");
+ pa_log_debug("afb comm thread exiting...");
}
-static bool ws_comm_thread_send(int fd, char opcode, void *data) {
- m4a_ws_comm_pipe_data d;
+static bool internal_thread_send(m4a_afb_comm *comm, char opcode, void *data) {
+ m4a_internal_pipe_data d;
ssize_t ret;
d.opcode = opcode;
d.data = data;
do {
- ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data));
+ ret = write(comm->tx_pipe[1], &d, sizeof(m4a_internal_pipe_data));
} while (ret < 0 && errno == EINTR);
- if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
+ if (ret < (ssize_t) sizeof(m4a_internal_pipe_data)) {
pa_log("write: %s", strerror(errno));
return false;
}
@@ -228,58 +177,36 @@ static bool ws_comm_thread_send(int fd, char opcode, void *data) {
return true;
}
-bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb,
- char *object, m4a_afb_done_cb_t done_cb, void *userdata) {
- m4a_afb_api_call_data *c;
-
- c = pa_xnew0(m4a_afb_api_call_data, 1);
- c->comm = comm;
- c->api = api;
- c->verb = verb;
- c->object = object;
- c->done_cb = done_cb;
- c->userdata = userdata;
-
- pa_mutex_lock(comm->pending_calls_lock);
- PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c);
- pa_mutex_unlock(comm->pending_calls_lock);
-
- if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) {
- c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
- afb_api_call_data_free(c);
- return false;
- }
- return true;
-}
-
m4a_afb_comm *m4a_afb_comm_new(const char *uri) {
int ret;
m4a_afb_comm *comm;
comm = pa_xnew0(m4a_afb_comm, 1);
+ asprintf(&comm->sessid, "pulseaudio:%d", getpid());
if ((ret = sd_event_new(&comm->loop)) < 0) {
pa_log("Failed to create systemd event loop: %s", strerror(-ret));
goto fail;
}
- if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) {
+ if (!(comm->afbhandle = afb_ws_client_connect_api(comm->loop, uri, &itf, comm))) {
pa_log("Connection to %s failed: %s", uri, strerror(errno));
goto fail;
}
- if (pipe(comm->pipe) < 0) {
- pa_log("pipe2 failed: %s", strerror(errno));
+ if (pipe(comm->tx_pipe) < 0) {
+ pa_log("pipe failed: %s", strerror(errno));
goto fail;
}
- if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN,
- (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) {
+ if ((ret = sd_event_add_io(comm->loop, &comm->tx_source, comm->tx_pipe[0], EPOLLIN,
+ (sd_event_io_handler_t) internal_thread_event, comm)) < 0) {
pa_log("sd_event_add_io failed: %s", strerror(-ret));
goto fail;
}
- if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) {
+ if (!(comm->thread = pa_thread_new("m4a_afb_comm_internal_thread",
+ (pa_thread_func_t) internal_thread, comm))) {
pa_log("Failed to start websocket communication thread");
goto fail;
}
@@ -296,32 +223,69 @@ fail:
return NULL;
}
+
+static void discard_pending_calls(m4a_afb_comm *comm) {
+ m4a_afb_api_call_data *c;
+
+ pa_mutex_lock(comm->pending_calls_lock);
+ while ((c = comm->pending_calls)) {
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ m4a_afb_api_call_data_free(c);
+ }
+ pa_mutex_unlock(comm->pending_calls_lock);
+}
+
void m4a_afb_comm_free(m4a_afb_comm *comm) {
if (comm->thread) {
- if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) {
+ if (!internal_thread_send(comm, OPCODE_EXIT, NULL)) {
pa_log("failed to shutdown thread gracefully");
pa_thread_free_nojoin(comm->thread);
} else {
pa_thread_free(comm->thread);
}
}
- if (comm->source)
- sd_event_source_unref(comm->source);
+ if (comm->tx_source)
+ sd_event_source_unref(comm->tx_source);
if (comm->loop)
sd_event_unref(comm->loop);
- if (comm->wsj1)
- afb_wsj1_unref(comm->wsj1);
+ if (comm->afbhandle)
+ afb_proto_ws_unref(comm->afbhandle);
if (comm->pending_calls_lock) {
discard_pending_calls(comm);
pa_mutex_free(comm->pending_calls_lock);
}
- if (comm->pipe[0] > 0)
- close(comm->pipe[0]);
- if (comm->pipe[1] > 0)
- close(comm->pipe[1]);
+ if (comm->tx_pipe[0] > 0)
+ close(comm->tx_pipe[0]);
+ if (comm->tx_pipe[1] > 0)
+ close(comm->tx_pipe[1]);
+ free(comm->sessid);
pa_xfree(comm);
}
+
+bool m4a_afb_call_async(m4a_afb_comm *comm, const char *verb,
+ json_object *object, m4a_afb_done_cb_t done_cb,
+ void *userdata) {
+ m4a_afb_api_call_data *c;
+
+ c = pa_xnew0(m4a_afb_api_call_data, 1);
+ c->comm = comm;
+ c->verb = verb;
+ c->object = object;
+ c->done_cb = done_cb;
+ c->userdata = userdata;
+
+ pa_mutex_lock(comm->pending_calls_lock);
+ PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c);
+ pa_mutex_unlock(comm->pending_calls_lock);
+
+ if (!internal_thread_send(comm, OPCODE_CALL_ASYNC, c)) {
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ m4a_afb_api_call_data_free(c);
+ return false;
+ }
+ return true;
+}