diff options
Diffstat (limited to 'm4a_afb_comm.c')
-rw-r--r-- | m4a_afb_comm.c | 327 |
1 files changed, 327 insertions, 0 deletions
diff --git a/m4a_afb_comm.c b/m4a_afb_comm.c new file mode 100644 index 0000000..63b8d61 --- /dev/null +++ b/m4a_afb_comm.c @@ -0,0 +1,327 @@ +/*** + This file is part of PulseAudio. + + Copyright 2018 Collabora Ltd. + Author: George Kiagiadakis <george.kiagiadakis@collabora.com> + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "m4a_afb_comm.h" + +#include <errno.h> +#include <unistd.h> + +#include <pulsecore/thread.h> +#include <pulsecore/llist.h> +#include <pulsecore/mutex.h> +#include <pulse/xmalloc.h> + +#include <systemd/sd-event.h> +#include <afb/afb-wsj1.h> +#include <afb/afb-ws-client.h> + +typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data; +typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data; + +enum m4a_ws_comm_pipe_opcode { + OPCODE_CALL_ASYNC = 1, + OPCODE_EXIT +}; + +struct _m4a_ws_comm_pipe_data { + enum m4a_ws_comm_pipe_opcode opcode; + void *data; +}; + +struct _m4a_afb_comm { + struct afb_wsj1 *wsj1; + + pa_thread *thread; + sd_event *loop; + int pipe[2]; + sd_event_source *source; + + pa_mutex *pending_calls_lock; + PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls); +}; + +struct _m4a_afb_api_call_data { + m4a_afb_comm *comm; + + const char *api; + const char *verb; + char *object; + + m4a_afb_done_cb_t done_cb; + void *userdata; + + PA_LLIST_FIELDS(m4a_afb_api_call_data); +}; + +static void afb_api_call_data_free(m4a_afb_api_call_data *c) { + if (c->object) + pa_xfree(c->object); + + pa_mutex_lock(c->comm->pending_calls_lock); + PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c); + pa_mutex_unlock(c->comm->pending_calls_lock); + + pa_xfree(c); +} + +static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) { + m4a_afb_api_call_data *c = cb_data; + pa_json_object *j; + const pa_json_object *jr = NULL; + + j = pa_json_parse(afb_wsj1_msg_object_s(msg)); + if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) { + pa_log_warn("4a reply is not a json object"); + } else if (afb_wsj1_msg_is_reply_ok(msg)) { + jr = pa_json_object_get_object_member(j, "response"); + } else { + jr = pa_json_object_get_object_member(j, "request"); + } + + if (afb_wsj1_msg_is_reply_ok(msg)) { + pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb); + c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata); + } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) { + const pa_json_object *status, *info; + const char *status_str, *info_str; + + status = pa_json_object_get_object_member(jr, "status"); + info = pa_json_object_get_object_member(jr, "info"); + + if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING) + status_str = pa_json_object_get_string(status); + else + status_str = "(null)"; + + if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING) + info_str = pa_json_object_get_string(info); + else + info_str = "(null)"; + + pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str); + c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata); + } else { + pa_log("Got error reply from 4a"); + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + } + + pa_json_object_free(j); + afb_api_call_data_free(c); +} + +static void afb_call_async_step2(m4a_afb_api_call_data *c) { + pa_log_debug("calling afb: %s/%s", c->api, c->verb); + + if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) { + pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno)); + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + afb_api_call_data_free(c); + } +} + +static void discard_pending_calls(m4a_afb_comm *comm) { + m4a_afb_api_call_data *c; + + pa_mutex_lock(comm->pending_calls_lock); + while ((c = comm->pending_calls)) { + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + afb_api_call_data_free(c); + } + pa_mutex_unlock(comm->pending_calls_lock); +} + +static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) { + m4a_afb_comm *comm = closure; + + pa_log_warn("afb closed the communication websocket!"); + discard_pending_calls(comm); + + //TODO: attempt to re-connect +} + +static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { + /* we don't implement any method calls */ + afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL); +} + +static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) { + //TODO dispatch events +} + +static struct afb_wsj1_itf wsj1_itf = { + .on_hangup = on_wsj1_hangup, + .on_call = on_wsj1_call, + .on_event = on_wsj1_event +}; + +static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) { + ssize_t r; + m4a_ws_comm_pipe_data d = { 0 }; + + if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { + if (errno == EINTR) + return 0; + + pa_log("read failed: %s", strerror(errno)); + return -1; + } + + switch(d.opcode) { + case OPCODE_CALL_ASYNC: { + m4a_afb_api_call_data *c = d.data; + afb_call_async_step2(c); + break; + } + case OPCODE_EXIT: + sd_event_exit(comm->loop, 0); + break; + default: + break; + }; + + return 0; +} + +static void ws_comm_thread(m4a_afb_comm *comm) { + pa_log_debug("websocket thread starting..."); + sd_event_loop(comm->loop); + pa_log_debug("websocket thread exiting..."); +} + +static bool ws_comm_thread_send(int fd, char opcode, void *data) { + m4a_ws_comm_pipe_data d; + ssize_t ret; + + d.opcode = opcode; + d.data = data; + + do { + ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data)); + } while (ret < 0 && errno == EINTR); + + if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { + pa_log("write: %s", strerror(errno)); + return false; + } + + return true; +} + +bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb, + char *object, m4a_afb_done_cb_t done_cb, void *userdata) { + m4a_afb_api_call_data *c; + + c = pa_xnew0(m4a_afb_api_call_data, 1); + c->comm = comm; + c->api = api; + c->verb = verb; + c->object = object; + c->done_cb = done_cb; + c->userdata = userdata; + + pa_mutex_lock(comm->pending_calls_lock); + PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c); + pa_mutex_unlock(comm->pending_calls_lock); + + if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) { + c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); + afb_api_call_data_free(c); + return false; + } + return true; +} + +m4a_afb_comm *m4a_afb_comm_new(const char *uri) { + int ret; + m4a_afb_comm *comm; + + comm = pa_xnew0(m4a_afb_comm, 1); + + if ((ret = sd_event_new(&comm->loop)) < 0) { + pa_log("Failed to create systemd event loop: %s", strerror(-ret)); + goto fail; + } + + if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) { + pa_log("Connection to %s failed: %s", uri, strerror(errno)); + goto fail; + } + + if (pipe(comm->pipe) < 0) { + pa_log("pipe2 failed: %s", strerror(errno)); + goto fail; + } + + if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN, + (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) { + pa_log("sd_event_add_io failed: %s", strerror(-ret)); + goto fail; + } + + if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) { + pa_log("Failed to start websocket communication thread"); + goto fail; + } + + comm->pending_calls_lock = pa_mutex_new(true, false); + comm->pending_calls = NULL; + + pa_log_debug("afb comm started"); + + return comm; + +fail: + m4a_afb_comm_free(comm); + return NULL; +} + +void m4a_afb_comm_free(m4a_afb_comm *comm) { + if (comm->thread) { + if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) { + pa_log("failed to shutdown thread gracefully"); + pa_thread_free_nojoin(comm->thread); + } else { + pa_thread_free(comm->thread); + } + } + if (comm->source) + sd_event_source_unref(comm->source); + if (comm->loop) + sd_event_unref(comm->loop); + + if (comm->wsj1) + afb_wsj1_unref(comm->wsj1); + + if (comm->pending_calls_lock) { + discard_pending_calls(comm); + pa_mutex_free(comm->pending_calls_lock); + } + + if (comm->pipe[0] > 0) + close(comm->pipe[0]); + if (comm->pipe[1] > 0) + close(comm->pipe[1]); + + pa_xfree(comm); +} |