/*** This file is part of PulseAudio. Copyright 2018 Collabora Ltd. Author: George Kiagiadakis PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. PulseAudio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with PulseAudio; if not, see . ***/ #include #include "m4a_afb_comm.h" #include #include #include #include #include #include #include #include #include #include enum m4a_internal_pipe_opcode { OPCODE_CALL_ASYNC = 1, OPCODE_EXIT }; typedef struct _m4a_internal_pipe_data m4a_internal_pipe_data; typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data; /* structure sent over the internal pipe * for communication between the two threads */ struct _m4a_internal_pipe_data { enum m4a_internal_pipe_opcode opcode; void *data; }; /* structure holding data related to an ongoing AFB API call */ struct _m4a_afb_api_call_data { m4a_afb_comm *comm; const char *verb; json_object *object; m4a_afb_done_cb_t done_cb; void *userdata; PA_LLIST_FIELDS(m4a_afb_api_call_data); }; /* the main structure of this class */ struct _m4a_afb_comm { struct afb_proto_ws *afbhandle; char *sessid; /* the thread that runs the loop below */ pa_thread *thread; /* our internal event loop */ sd_event *loop; /* the pipe fd pair used to send commands from * the pulse threads to our internal thread */ int tx_pipe[2]; /* the source for listening to the above pipe from our event loop */ sd_event_source *tx_source; /* a list of pending API calls, protected with a mutex */ pa_mutex *pending_calls_lock; PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls); }; static void m4a_afb_api_call_data_free(m4a_afb_api_call_data *c) { if (c->object) json_object_put(c->object); pa_mutex_lock(c->comm->pending_calls_lock); PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c); pa_mutex_unlock(c->comm->pending_calls_lock); pa_xfree(c); } static void m4a_afb_on_reply(void *ctx, void *handle, json_object *response, const char *error, const char *info) { m4a_afb_api_call_data *c = handle; if (error) { pa_log("Got error reply from 4a: %s", error); c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); } else { pa_log_debug("Got OK reply from 4a in call to %s", c->verb); c->done_cb(M4A_AFB_REPLY_OK, response, c->userdata); } m4a_afb_api_call_data_free(c); } static struct afb_proto_ws_client_itf itf = { .on_reply = m4a_afb_on_reply }; static int internal_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) { ssize_t r; m4a_internal_pipe_data d = { 0 }; if ((r = read(fd, &d, sizeof(m4a_internal_pipe_data))) < (ssize_t) sizeof(m4a_internal_pipe_data)) { if (errno == EINTR) return 0; pa_log("read failed: %s", strerror(errno)); return -1; } switch(d.opcode) { case OPCODE_CALL_ASYNC: { m4a_afb_api_call_data *c = d.data; if (afb_proto_ws_client_call(c->comm->afbhandle, c->verb, c->object, c->comm->sessid, c, NULL) < 0) { pa_log("afb_proto_ws_client_call: failed to call %s: %s", c->verb, strerror(errno)); c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); m4a_afb_api_call_data_free(c); } break; } case OPCODE_EXIT: sd_event_exit(comm->loop, 0); break; default: break; }; return 0; } static void internal_thread(m4a_afb_comm *comm) { pa_log_debug("afb comm thread starting..."); sd_event_loop(comm->loop); pa_log_debug("afb comm thread exiting..."); } static bool internal_thread_send(m4a_afb_comm *comm, char opcode, void *data) { m4a_internal_pipe_data d; ssize_t ret; d.opcode = opcode; d.data = data; do { ret = write(comm->tx_pipe[1], &d, sizeof(m4a_internal_pipe_data)); } while (ret < 0 && errno == EINTR); if (ret < (ssize_t) sizeof(m4a_internal_pipe_data)) { pa_log("write: %s", strerror(errno)); return false; } return true; } m4a_afb_comm *m4a_afb_comm_new(const char *uri) { int ret; m4a_afb_comm *comm; comm = pa_xnew0(m4a_afb_comm, 1); if (asprintf(&comm->sessid, "pulseaudio:%d", getpid()) < 0) { pa_log("asprintf failed"); comm->sessid = NULL; goto fail; } if ((ret = sd_event_new(&comm->loop)) < 0) { pa_log("Failed to create systemd event loop: %s", strerror(-ret)); goto fail; } if (!(comm->afbhandle = afb_ws_client_connect_api(comm->loop, uri, &itf, comm))) { pa_log("Connection to %s failed: %s", uri, strerror(errno)); goto fail; } if (pipe(comm->tx_pipe) < 0) { pa_log("pipe failed: %s", strerror(errno)); goto fail; } if ((ret = sd_event_add_io(comm->loop, &comm->tx_source, comm->tx_pipe[0], EPOLLIN, (sd_event_io_handler_t) internal_thread_event, comm)) < 0) { pa_log("sd_event_add_io failed: %s", strerror(-ret)); goto fail; } if (!(comm->thread = pa_thread_new("m4a_afb_comm_internal_thread", (pa_thread_func_t) internal_thread, comm))) { pa_log("Failed to start websocket communication thread"); goto fail; } comm->pending_calls_lock = pa_mutex_new(true, false); comm->pending_calls = NULL; pa_log_debug("afb comm started"); return comm; fail: m4a_afb_comm_free(comm); return NULL; } static void discard_pending_calls(m4a_afb_comm *comm) { m4a_afb_api_call_data *c; pa_mutex_lock(comm->pending_calls_lock); while ((c = comm->pending_calls)) { c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); m4a_afb_api_call_data_free(c); } pa_mutex_unlock(comm->pending_calls_lock); } void m4a_afb_comm_free(m4a_afb_comm *comm) { if (comm->thread) { if (!internal_thread_send(comm, OPCODE_EXIT, NULL)) { pa_log("failed to shutdown thread gracefully"); pa_thread_free_nojoin(comm->thread); } else { pa_thread_free(comm->thread); } } if (comm->tx_source) sd_event_source_unref(comm->tx_source); if (comm->loop) sd_event_unref(comm->loop); if (comm->afbhandle) afb_proto_ws_unref(comm->afbhandle); if (comm->pending_calls_lock) { discard_pending_calls(comm); pa_mutex_free(comm->pending_calls_lock); } if (comm->tx_pipe[0] > 0) close(comm->tx_pipe[0]); if (comm->tx_pipe[1] > 0) close(comm->tx_pipe[1]); free(comm->sessid); pa_xfree(comm); } bool m4a_afb_call_async(m4a_afb_comm *comm, const char *verb, json_object *object, m4a_afb_done_cb_t done_cb, void *userdata) { m4a_afb_api_call_data *c; c = pa_xnew0(m4a_afb_api_call_data, 1); c->comm = comm; c->verb = verb; c->object = object; c->done_cb = done_cb; c->userdata = userdata; pa_mutex_lock(comm->pending_calls_lock); PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c); pa_mutex_unlock(comm->pending_calls_lock); if (!internal_thread_send(comm, OPCODE_CALL_ASYNC, c)) { c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata); m4a_afb_api_call_data_free(c); return false; } return true; }