/*** This file is part of PulseAudio. Copyright 2018 Collabora Ltd. Author: George Kiagiadakis PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. PulseAudio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with PulseAudio; if not, see . ***/ #ifdef HAVE_CONFIG_H #include #endif #include "m4a_afb_comm.h" #include #include #include #include #include #include #include #include #include typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data; typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data; enum m4a_ws_comm_pipe_opcode { OPCODE_CALL_ASYNC = 1, OPCODE_EXIT }; struct _m4a_ws_comm_pipe_data { enum m4a_ws_comm_pipe_opcode opcode; void *data; }; struct _m4a_afb_comm { struct afb_wsj1 *wsj1; pa_thread *thread; sd_event *loop; int pipe[2]; sd_event_source *source; pa_mutex *pending_calls_lock; PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls); }; struct _m4a_afb_api_call_data { m4a_afb_comm *comm; const char *api; const char *verb; char *object; m4a_afb_done_cb_t done_cb; void *userdata; PA_LLIST_FIELDS(m4a_afb_api_call_data); }; static void afb_api_call_data_free(m4a_afb_api_call_data *c) { if (c->object) pa_xfree(c->object); pa_mutex_lock(c->comm->pending_calls_lock); PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c); pa_mutex_unlock(c->comm->pending_calls_lock); pa_xfree(c); } static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) { m4a_afb_api_call_data *c = cb_data; pa_json_object *j; const pa_json_object *jr = NULL; j = pa_json_parse(afb_wsj1_msg_object_s(msg)); if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) { pa_log_warn("4a reply is not a json object"); } else if (afb_wsj1_msg_is_reply_ok(msg)) { jr = pa_json_object_get_object_member(j, "response"); } else { jr = pa_json_object_get_object_member(j, "request"); } if (afb_wsj1_msg_is_reply_ok(msg)) { pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb); c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata); } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) { const pa_json_object *status, *info; const char *status_str, *info_str; status = pa_json_object_get_object_member(jr, "status"); info = pa_json_object_get_object_member(jr, "info"); if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING) status_str = pa_json_object_get_string(status); else status_str = "(null)"; if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING) info_str = pa_json_object_get_string(info); else info_str = "(null)"; pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str); c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata); } else { pa_log("Got error reply from 4a"); c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); } pa_json_object_free(j); afb_api_call_data_free(c); } static void afb_call_async_step2(m4a_afb_api_call_data *c) { pa_log_debug("calling afb: %s/%s", c->api, c->verb); if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) { pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno)); c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); afb_api_call_data_free(c); } } static void discard_pending_calls(m4a_afb_comm *comm) { m4a_afb_api_call_data *c; pa_mutex_lock(comm->pending_calls_lock); while ((c = comm->pending_calls)) { c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); afb_api_call_data_free(c); } pa_mutex_unlock(comm->pending_calls_lock); } static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) { m4a_afb_comm *comm = closure; pa_log_warn("afb closed the communication websocket!"); discard_pending_calls(comm); //TODO: attempt to re-connect } static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { /* we don't implement any method calls */ afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL); } static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) { //TODO dispatch events } static struct afb_wsj1_itf wsj1_itf = { .on_hangup = on_wsj1_hangup, .on_call = on_wsj1_call, .on_event = on_wsj1_event }; static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) { ssize_t r; m4a_ws_comm_pipe_data d = { 0 }; if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { if (errno == EINTR) return 0; pa_log("read failed: %s", strerror(errno)); return -1; } switch(d.opcode) { case OPCODE_CALL_ASYNC: { m4a_afb_api_call_data *c = d.data; afb_call_async_step2(c); break; } case OPCODE_EXIT: sd_event_exit(comm->loop, 0); break; default: break; }; return 0; } static void ws_comm_thread(m4a_afb_comm *comm) { pa_log_debug("websocket thread starting..."); sd_event_loop(comm->loop); pa_log_debug("websocket thread exiting..."); } static bool ws_comm_thread_send(int fd, char opcode, void *data) { m4a_ws_comm_pipe_data d; ssize_t ret; d.opcode = opcode; d.data = data; do { ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data)); } while (ret < 0 && errno == EINTR); if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) { pa_log("write: %s", strerror(errno)); return false; } return true; } bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb, char *object, m4a_afb_done_cb_t done_cb, void *userdata) { m4a_afb_api_call_data *c; c = pa_xnew0(m4a_afb_api_call_data, 1); c->comm = comm; c->api = api; c->verb = verb; c->object = object; c->done_cb = done_cb; c->userdata = userdata; pa_mutex_lock(comm->pending_calls_lock); PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c); pa_mutex_unlock(comm->pending_calls_lock); if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) { c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); afb_api_call_data_free(c); return false; } return true; } m4a_afb_comm *m4a_afb_comm_new(const char *uri) { int ret; m4a_afb_comm *comm; comm = pa_xnew0(m4a_afb_comm, 1); if ((ret = sd_event_new(&comm->loop)) < 0) { pa_log("Failed to create systemd event loop: %s", strerror(-ret)); goto fail; } if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) { pa_log("Connection to %s failed: %s", uri, strerror(errno)); goto fail; } if (pipe(comm->pipe) < 0) { pa_log("pipe2 failed: %s", strerror(errno)); goto fail; } if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN, (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) { pa_log("sd_event_add_io failed: %s", strerror(-ret)); goto fail; } if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) { pa_log("Failed to start websocket communication thread"); goto fail; } comm->pending_calls_lock = pa_mutex_new(true, false); comm->pending_calls = NULL; pa_log_debug("afb comm started"); return comm; fail: m4a_afb_comm_free(comm); return NULL; } void m4a_afb_comm_free(m4a_afb_comm *comm) { if (comm->thread) { if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) { pa_log("failed to shutdown thread gracefully"); pa_thread_free_nojoin(comm->thread); } else { pa_thread_free(comm->thread); } } if (comm->source) sd_event_source_unref(comm->source); if (comm->loop) sd_event_unref(comm->loop); if (comm->wsj1) afb_wsj1_unref(comm->wsj1); if (comm->pending_calls_lock) { discard_pending_calls(comm); pa_mutex_free(comm->pending_calls_lock); } if (comm->pipe[0] > 0) close(comm->pipe[0]); if (comm->pipe[1] > 0) close(comm->pipe[1]); pa_xfree(comm); }