diff options
Diffstat (limited to 'm4a_afb_comm.c')
-rw-r--r-- | m4a_afb_comm.c | 274 |
1 files changed, 119 insertions, 155 deletions
diff --git a/m4a_afb_comm.c b/m4a_afb_comm.c index 63b8d61..30e1521 100644 --- a/m4a_afb_comm.c +++ b/m4a_afb_comm.c @@ -24,6 +24,8 @@ #include "m4a_afb_comm.h" +#define _GNU_SOURCE +#include <stdio.h> #include <errno.h> #include <unistd.h> @@ -33,40 +35,30 @@ #include <pulse/xmalloc.h> #include <systemd/sd-event.h> -#include <afb/afb-wsj1.h> +#include <afb/afb-proto-ws.h> #include <afb/afb-ws-client.h> -typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data; -typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data; - -enum m4a_ws_comm_pipe_opcode { +enum m4a_internal_pipe_opcode { OPCODE_CALL_ASYNC = 1, OPCODE_EXIT }; -struct _m4a_ws_comm_pipe_data { - enum m4a_ws_comm_pipe_opcode opcode; - void *data; -}; - -struct _m4a_afb_comm { - struct afb_wsj1 *wsj1; - - pa_thread *thread; - sd_event *loop; - int pipe[2]; - sd_event_source *source; +typedef struct _m4a_internal_pipe_data m4a_internal_pipe_data; +typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data; - pa_mutex *pending_calls_lock; - PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls); +/* structure sent over the internal pipe + * for communication between the two threads */ +struct _m4a_internal_pipe_data { + enum m4a_internal_pipe_opcode opcode; + void *data; }; +/* structure holding data related to an ongoing AFB API call */ struct _m4a_afb_api_call_data { m4a_afb_comm *comm; - const char *api; const char *verb; - char *object; + json_object *object; m4a_afb_done_cb_t done_cb; void *userdata; @@ -74,9 +66,30 @@ struct _m4a_afb_api_call_data { PA_LLIST_FIELDS(m4a_afb_api_call_data); }; -static void afb_api_call_data_free(m4a_afb_api_call_data *c) { +/* the main structure of this class */ +struct _m4a_afb_comm { + struct afb_proto_ws *afbhandle; + + char *sessid; + + /* the thread that runs the loop below */ + pa_thread *thread; + /* our internal event loop */ + sd_event *loop; + /* the pipe fd pair used to send commands from + * the pulse threads to our internal thread */ + int tx_pipe[2]; + /* the source for listening to the above pipe from our event loop */ + sd_event_source *tx_source; + + /* a list of pending API calls, protected with a mutex */ + pa_mutex *pending_calls_lock; + PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls); +}; + +static void m4a_afb_api_call_data_free(m4a_afb_api_call_data *c) { if (c->object) - pa_xfree(c->object); + json_object_put(c->object); pa_mutex_lock(c->comm->pending_calls_lock); PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c); @@ -85,101 +98,30 @@ static void afb_api_call_data_free(m4a_afb_api_call_data *c) { pa_xfree(c); } -static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) { - m4a_afb_api_call_data *c = cb_data; - pa_json_object *j; - const pa_json_object *jr = NULL; - - j = pa_json_parse(afb_wsj1_msg_object_s(msg)); - if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) { - pa_log_warn("4a reply is not a json object"); - } else if (afb_wsj1_msg_is_reply_ok(msg)) { - jr = pa_json_object_get_object_member(j, "response"); - } else { - jr = pa_json_object_get_object_member(j, "request"); - } - - if (afb_wsj1_msg_is_reply_ok(msg)) { - pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb); - c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata); - } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) { - const pa_json_object *status, *info; - const char *status_str, *info_str; +static void m4a_afb_on_reply(void *ctx, void *handle, json_object *response, + const char *error, const char *info) { + m4a_afb_api_call_data *c = handle; - status = pa_json_object_get_object_member(jr, "status"); - info = pa_json_object_get_object_member(jr, "info"); - - if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING) - status_str = pa_json_object_get_string(status); - else - status_str = "(null)"; - - if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING) - info_str = pa_json_object_get_string(info); - else - info_str = "(null)"; - - pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str); - c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata); - } else { - pa_log("Got error reply from 4a"); - c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); - } - - pa_json_object_free(j); - afb_api_call_data_free(c); -} - -static void afb_call_async_step2(m4a_afb_api_call_data *c) { - pa_log_debug("calling afb: %s/%s", c->api, c->verb); - - if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) { - pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno)); - c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); - afb_api_call_data_free(c); - } -} - -static void discard_pending_calls(m4a_afb_comm *comm) { - m4a_afb_api_call_data *c; - - pa_mutex_lock(comm->pending_calls_lock); - while ((c = comm->pending_calls)) { + if (error) { + pa_log("Got error reply from 4a: %s", error); c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); - afb_api_call_data_free(c); + } else { + pa_log_debug("Got OK reply from 4a in call to %s", c->verb); + c->done_cb(M4A_AFB_REPLY_OK, response, c->userdata); } - pa_mutex_unlock(comm->pending_calls_lock); -} - -static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) { - m4a_afb_comm *comm = closure; - - pa_log_warn("afb closed the communication websocket!"); - discard_pending_calls(comm); - - //TODO: attempt to re-connect -} -static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { - /* we don't implement any method calls */ - afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL); + m4a_afb_api_call_data_free(c); } -static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) { - //TODO dispatch events -} - -static struct afb_wsj1_itf wsj1_itf = { - .on_hangup = on_wsj1_hangup, - .on_call = on_wsj1_call, - .on_event = on_wsj1_event +static struct afb_proto_ws_client_itf itf = { + .on_reply = m4a_afb_on_reply }; -static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) { +static int internal_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) { ssize_t r; - m4a_ws_comm_pipe_data d = { 0 }; + m4a_internal_pipe_data d = { 0 }; - if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { + if ((r = read(fd, &d, sizeof(m4a_internal_pipe_data))) < (ssize_t) sizeof(m4a_internal_pipe_data)) { if (errno == EINTR) return 0; @@ -190,7 +132,14 @@ static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4 switch(d.opcode) { case OPCODE_CALL_ASYNC: { m4a_afb_api_call_data *c = d.data; - afb_call_async_step2(c); + + if (afb_proto_ws_client_call(c->comm->afbhandle, c->verb, c->object, + c->comm->sessid, c, NULL) < 0) { + pa_log("afb_proto_ws_client_call: failed to call %s: %s", + c->verb, strerror(errno)); + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + m4a_afb_api_call_data_free(c); + } break; } case OPCODE_EXIT: @@ -203,24 +152,24 @@ static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4 return 0; } -static void ws_comm_thread(m4a_afb_comm *comm) { - pa_log_debug("websocket thread starting..."); +static void internal_thread(m4a_afb_comm *comm) { + pa_log_debug("afb comm thread starting..."); sd_event_loop(comm->loop); - pa_log_debug("websocket thread exiting..."); + pa_log_debug("afb comm thread exiting..."); } -static bool ws_comm_thread_send(int fd, char opcode, void *data) { - m4a_ws_comm_pipe_data d; +static bool internal_thread_send(m4a_afb_comm *comm, char opcode, void *data) { + m4a_internal_pipe_data d; ssize_t ret; d.opcode = opcode; d.data = data; do { - ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data)); + ret = write(comm->tx_pipe[1], &d, sizeof(m4a_internal_pipe_data)); } while (ret < 0 && errno == EINTR); - if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { + if (ret < (ssize_t) sizeof(m4a_internal_pipe_data)) { pa_log("write: %s", strerror(errno)); return false; } @@ -228,58 +177,36 @@ static bool ws_comm_thread_send(int fd, char opcode, void *data) { return true; } -bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb, - char *object, m4a_afb_done_cb_t done_cb, void *userdata) { - m4a_afb_api_call_data *c; - - c = pa_xnew0(m4a_afb_api_call_data, 1); - c->comm = comm; - c->api = api; - c->verb = verb; - c->object = object; - c->done_cb = done_cb; - c->userdata = userdata; - - pa_mutex_lock(comm->pending_calls_lock); - PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c); - pa_mutex_unlock(comm->pending_calls_lock); - - if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) { - c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); - afb_api_call_data_free(c); - return false; - } - return true; -} - m4a_afb_comm *m4a_afb_comm_new(const char *uri) { int ret; m4a_afb_comm *comm; comm = pa_xnew0(m4a_afb_comm, 1); + asprintf(&comm->sessid, "pulseaudio:%d", getpid()); if ((ret = sd_event_new(&comm->loop)) < 0) { pa_log("Failed to create systemd event loop: %s", strerror(-ret)); goto fail; } - if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) { + if (!(comm->afbhandle = afb_ws_client_connect_api(comm->loop, uri, &itf, comm))) { pa_log("Connection to %s failed: %s", uri, strerror(errno)); goto fail; } - if (pipe(comm->pipe) < 0) { - pa_log("pipe2 failed: %s", strerror(errno)); + if (pipe(comm->tx_pipe) < 0) { + pa_log("pipe failed: %s", strerror(errno)); goto fail; } - if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN, - (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) { + if ((ret = sd_event_add_io(comm->loop, &comm->tx_source, comm->tx_pipe[0], EPOLLIN, + (sd_event_io_handler_t) internal_thread_event, comm)) < 0) { pa_log("sd_event_add_io failed: %s", strerror(-ret)); goto fail; } - if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) { + if (!(comm->thread = pa_thread_new("m4a_afb_comm_internal_thread", + (pa_thread_func_t) internal_thread, comm))) { pa_log("Failed to start websocket communication thread"); goto fail; } @@ -296,32 +223,69 @@ fail: return NULL; } + +static void discard_pending_calls(m4a_afb_comm *comm) { + m4a_afb_api_call_data *c; + + pa_mutex_lock(comm->pending_calls_lock); + while ((c = comm->pending_calls)) { + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + m4a_afb_api_call_data_free(c); + } + pa_mutex_unlock(comm->pending_calls_lock); +} + void m4a_afb_comm_free(m4a_afb_comm *comm) { if (comm->thread) { - if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) { + if (!internal_thread_send(comm, OPCODE_EXIT, NULL)) { pa_log("failed to shutdown thread gracefully"); pa_thread_free_nojoin(comm->thread); } else { pa_thread_free(comm->thread); } } - if (comm->source) - sd_event_source_unref(comm->source); + if (comm->tx_source) + sd_event_source_unref(comm->tx_source); if (comm->loop) sd_event_unref(comm->loop); - if (comm->wsj1) - afb_wsj1_unref(comm->wsj1); + if (comm->afbhandle) + afb_proto_ws_unref(comm->afbhandle); if (comm->pending_calls_lock) { discard_pending_calls(comm); pa_mutex_free(comm->pending_calls_lock); } - if (comm->pipe[0] > 0) - close(comm->pipe[0]); - if (comm->pipe[1] > 0) - close(comm->pipe[1]); + if (comm->tx_pipe[0] > 0) + close(comm->tx_pipe[0]); + if (comm->tx_pipe[1] > 0) + close(comm->tx_pipe[1]); + free(comm->sessid); pa_xfree(comm); } + +bool m4a_afb_call_async(m4a_afb_comm *comm, const char *verb, + json_object *object, m4a_afb_done_cb_t done_cb, + void *userdata) { + m4a_afb_api_call_data *c; + + c = pa_xnew0(m4a_afb_api_call_data, 1); + c->comm = comm; + c->verb = verb; + c->object = object; + c->done_cb = done_cb; + c->userdata = userdata; + + pa_mutex_lock(comm->pending_calls_lock); + PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c); + pa_mutex_unlock(comm->pending_calls_lock); + + if (!internal_thread_send(comm, OPCODE_CALL_ASYNC, c)) { + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + m4a_afb_api_call_data_free(c); + return false; + } + return true; +} |