summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2018-07-05 13:19:09 +0300
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2018-08-28 12:14:05 +0300
commit699e0ab7dd2ff86192e92d038ca0d008ec0bfb34 (patch)
tree50286b2a4ef25ef958a131c7a2f08a767159c020
parent4308a87133311d3bc63a0f4c21711c3c8462649a (diff)
Initial version
Change-Id: Iea9164fcf116ffb0683996830d371ca418694e6c Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
-rw-r--r--CMakeLists.txt41
-rw-r--r--m4a_afb_comm.c327
-rw-r--r--m4a_afb_comm.h43
-rw-r--r--module-4a-client.c408
4 files changed, 819 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..22584b1
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,41 @@
+project(pulseaudio-module-4a C)
+
+cmake_minimum_required(VERSION 2.8.8)
+set(CMAKE_BUILD_TYPE Debug)
+set(CMAKE_POSITION_INDEPENDENT_CODE ON)
+
+set(PROJECT_NAME "Pulseaudio Module 4A")
+set(PROJECT_VERSION "0.1")
+
+find_package(PkgConfig)
+include(GNUInstallDirs)
+
+###########################################################################
+
+add_compile_options(-Wall -Wextra -Wconversion)
+add_compile_options(-Wno-unused-parameter -Wno-unused-variable -Wno-unused-but-set-variable)
+add_compile_options(-Wno-parentheses)
+add_compile_options(-Wno-sign-compare -Wno-sign-conversion)
+add_compile_options(-Werror=maybe-uninitialized)
+add_compile_options(-Werror=implicit-function-declaration)
+add_compile_options(-ffunction-sections -fdata-sections)
+
+###########################################################################
+
+pkg_check_modules(dependencies REQUIRED libafbwsc libsystemd pulseaudio-module-devel)
+pkg_get_variable(plugin_install_dir pulseaudio-module-devel modlibexecdir)
+
+add_definitions(${dependencies_CFLAGS})
+include_directories(${dependencies_INCLUDE_DIRS})
+string(REGEX REPLACE ";" " " dep_link_flags "${dependencies_LDFLAGS}" "")
+set(link_flags "${dep_link_flags} -Wl,-rpath=${plugin_install_dir} -Wl,--as-needed -Wl,--gc-sections")
+
+############################################################
+
+add_library(module-4a-client MODULE module-4a-client.c m4a_afb_comm.c)
+target_link_libraries(module-4a-client ${dependencies_LIBRARIES})
+set_target_properties(module-4a-client PROPERTIES PREFIX ""
+ LINK_FLAGS "${link_flags}")
+
+install(TARGETS module-4a-client
+ DESTINATION ${plugin_install_dir})
diff --git a/m4a_afb_comm.c b/m4a_afb_comm.c
new file mode 100644
index 0000000..63b8d61
--- /dev/null
+++ b/m4a_afb_comm.c
@@ -0,0 +1,327 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2018 Collabora Ltd.
+ Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "m4a_afb_comm.h"
+
+#include <errno.h>
+#include <unistd.h>
+
+#include <pulsecore/thread.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/mutex.h>
+#include <pulse/xmalloc.h>
+
+#include <systemd/sd-event.h>
+#include <afb/afb-wsj1.h>
+#include <afb/afb-ws-client.h>
+
+typedef struct _m4a_ws_comm_pipe_data m4a_ws_comm_pipe_data;
+typedef struct _m4a_afb_api_call_data m4a_afb_api_call_data;
+
+enum m4a_ws_comm_pipe_opcode {
+ OPCODE_CALL_ASYNC = 1,
+ OPCODE_EXIT
+};
+
+struct _m4a_ws_comm_pipe_data {
+ enum m4a_ws_comm_pipe_opcode opcode;
+ void *data;
+};
+
+struct _m4a_afb_comm {
+ struct afb_wsj1 *wsj1;
+
+ pa_thread *thread;
+ sd_event *loop;
+ int pipe[2];
+ sd_event_source *source;
+
+ pa_mutex *pending_calls_lock;
+ PA_LLIST_HEAD(m4a_afb_api_call_data, pending_calls);
+};
+
+struct _m4a_afb_api_call_data {
+ m4a_afb_comm *comm;
+
+ const char *api;
+ const char *verb;
+ char *object;
+
+ m4a_afb_done_cb_t done_cb;
+ void *userdata;
+
+ PA_LLIST_FIELDS(m4a_afb_api_call_data);
+};
+
+static void afb_api_call_data_free(m4a_afb_api_call_data *c) {
+ if (c->object)
+ pa_xfree(c->object);
+
+ pa_mutex_lock(c->comm->pending_calls_lock);
+ PA_LLIST_REMOVE(m4a_afb_api_call_data, c->comm->pending_calls, c);
+ pa_mutex_unlock(c->comm->pending_calls_lock);
+
+ pa_xfree(c);
+}
+
+static void afb_call_async_step3(void *cb_data, struct afb_wsj1_msg *msg) {
+ m4a_afb_api_call_data *c = cb_data;
+ pa_json_object *j;
+ const pa_json_object *jr = NULL;
+
+ j = pa_json_parse(afb_wsj1_msg_object_s(msg));
+ if (pa_json_object_get_type(j) != PA_JSON_TYPE_OBJECT) {
+ pa_log_warn("4a reply is not a json object");
+ } else if (afb_wsj1_msg_is_reply_ok(msg)) {
+ jr = pa_json_object_get_object_member(j, "response");
+ } else {
+ jr = pa_json_object_get_object_member(j, "request");
+ }
+
+ if (afb_wsj1_msg_is_reply_ok(msg)) {
+ pa_log_debug("Got OK reply from 4a in call to %s/%s", c->api, c->verb);
+ c->done_cb(M4A_AFB_REPLY_OK, jr, c->userdata);
+ } else if (jr && pa_json_object_get_type(jr) == PA_JSON_TYPE_OBJECT) {
+ const pa_json_object *status, *info;
+ const char *status_str, *info_str;
+
+ status = pa_json_object_get_object_member(jr, "status");
+ info = pa_json_object_get_object_member(jr, "info");
+
+ if (status && pa_json_object_get_type(status) == PA_JSON_TYPE_STRING)
+ status_str = pa_json_object_get_string(status);
+ else
+ status_str = "(null)";
+
+ if (info && pa_json_object_get_type(info) == PA_JSON_TYPE_STRING)
+ info_str = pa_json_object_get_string(info);
+ else
+ info_str = "(null)";
+
+ pa_log("Got error reply from 4a, status: '%s', info: '%s'", status_str, info_str);
+ c->done_cb(M4A_AFB_REPLY_ERROR, jr, c->userdata);
+ } else {
+ pa_log("Got error reply from 4a");
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ }
+
+ pa_json_object_free(j);
+ afb_api_call_data_free(c);
+}
+
+static void afb_call_async_step2(m4a_afb_api_call_data *c) {
+ pa_log_debug("calling afb: %s/%s", c->api, c->verb);
+
+ if (afb_wsj1_call_s(c->comm->wsj1, c->api, c->verb, c->object, afb_call_async_step3, c) < 0) {
+ pa_log("afb_wsj1_call_s: failed to call %s/%s: %s", c->api, c->verb, strerror(errno));
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ afb_api_call_data_free(c);
+ }
+}
+
+static void discard_pending_calls(m4a_afb_comm *comm) {
+ m4a_afb_api_call_data *c;
+
+ pa_mutex_lock(comm->pending_calls_lock);
+ while ((c = comm->pending_calls)) {
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ afb_api_call_data_free(c);
+ }
+ pa_mutex_unlock(comm->pending_calls_lock);
+}
+
+static void on_wsj1_hangup(void *closure, struct afb_wsj1 *wsj1) {
+ m4a_afb_comm *comm = closure;
+
+ pa_log_warn("afb closed the communication websocket!");
+ discard_pending_calls(comm);
+
+ //TODO: attempt to re-connect
+}
+
+static void on_wsj1_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) {
+ /* we don't implement any method calls */
+ afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL);
+}
+
+static void on_wsj1_event(void *closure, const char *event, struct afb_wsj1_msg *msg) {
+ //TODO dispatch events
+}
+
+static struct afb_wsj1_itf wsj1_itf = {
+ .on_hangup = on_wsj1_hangup,
+ .on_call = on_wsj1_call,
+ .on_event = on_wsj1_event
+};
+
+static int ws_comm_thread_event(sd_event_source *s, int fd, uint32_t revents, m4a_afb_comm *comm) {
+ ssize_t r;
+ m4a_ws_comm_pipe_data d = { 0 };
+
+ if ((r = read(fd, &d, sizeof(m4a_ws_comm_pipe_data))) < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
+ if (errno == EINTR)
+ return 0;
+
+ pa_log("read failed: %s", strerror(errno));
+ return -1;
+ }
+
+ switch(d.opcode) {
+ case OPCODE_CALL_ASYNC: {
+ m4a_afb_api_call_data *c = d.data;
+ afb_call_async_step2(c);
+ break;
+ }
+ case OPCODE_EXIT:
+ sd_event_exit(comm->loop, 0);
+ break;
+ default:
+ break;
+ };
+
+ return 0;
+}
+
+static void ws_comm_thread(m4a_afb_comm *comm) {
+ pa_log_debug("websocket thread starting...");
+ sd_event_loop(comm->loop);
+ pa_log_debug("websocket thread exiting...");
+}
+
+static bool ws_comm_thread_send(int fd, char opcode, void *data) {
+ m4a_ws_comm_pipe_data d;
+ ssize_t ret;
+
+ d.opcode = opcode;
+ d.data = data;
+
+ do {
+ ret = write(fd, &d, sizeof(m4a_ws_comm_pipe_data));
+ } while (ret < 0 && errno == EINTR);
+
+ if (ret < (ssize_t) sizeof(m4a_ws_comm_pipe_data)) {
+ pa_log("write: %s", strerror(errno));
+ return false;
+ }
+
+ return true;
+}
+
+bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb,
+ char *object, m4a_afb_done_cb_t done_cb, void *userdata) {
+ m4a_afb_api_call_data *c;
+
+ c = pa_xnew0(m4a_afb_api_call_data, 1);
+ c->comm = comm;
+ c->api = api;
+ c->verb = verb;
+ c->object = object;
+ c->done_cb = done_cb;
+ c->userdata = userdata;
+
+ pa_mutex_lock(comm->pending_calls_lock);
+ PA_LLIST_PREPEND(m4a_afb_api_call_data, comm->pending_calls, c);
+ pa_mutex_unlock(comm->pending_calls_lock);
+
+ if (!ws_comm_thread_send(comm->pipe[1], OPCODE_CALL_ASYNC, c)) {
+ c->done_cb(M4A_AFB_REPLY_ERROR, NULL, c->userdata);
+ afb_api_call_data_free(c);
+ return false;
+ }
+ return true;
+}
+
+m4a_afb_comm *m4a_afb_comm_new(const char *uri) {
+ int ret;
+ m4a_afb_comm *comm;
+
+ comm = pa_xnew0(m4a_afb_comm, 1);
+
+ if ((ret = sd_event_new(&comm->loop)) < 0) {
+ pa_log("Failed to create systemd event loop: %s", strerror(-ret));
+ goto fail;
+ }
+
+ if (!(comm->wsj1 = afb_ws_client_connect_wsj1(comm->loop, uri, &wsj1_itf, comm))) {
+ pa_log("Connection to %s failed: %s", uri, strerror(errno));
+ goto fail;
+ }
+
+ if (pipe(comm->pipe) < 0) {
+ pa_log("pipe2 failed: %s", strerror(errno));
+ goto fail;
+ }
+
+ if ((ret = sd_event_add_io(comm->loop, &comm->source, comm->pipe[0], EPOLLIN,
+ (sd_event_io_handler_t) ws_comm_thread_event, comm)) < 0) {
+ pa_log("sd_event_add_io failed: %s", strerror(-ret));
+ goto fail;
+ }
+
+ if (!(comm->thread = pa_thread_new("afb_comm_loop", (pa_thread_func_t) ws_comm_thread, comm))) {
+ pa_log("Failed to start websocket communication thread");
+ goto fail;
+ }
+
+ comm->pending_calls_lock = pa_mutex_new(true, false);
+ comm->pending_calls = NULL;
+
+ pa_log_debug("afb comm started");
+
+ return comm;
+
+fail:
+ m4a_afb_comm_free(comm);
+ return NULL;
+}
+
+void m4a_afb_comm_free(m4a_afb_comm *comm) {
+ if (comm->thread) {
+ if (!ws_comm_thread_send(comm->pipe[1], OPCODE_EXIT, NULL)) {
+ pa_log("failed to shutdown thread gracefully");
+ pa_thread_free_nojoin(comm->thread);
+ } else {
+ pa_thread_free(comm->thread);
+ }
+ }
+ if (comm->source)
+ sd_event_source_unref(comm->source);
+ if (comm->loop)
+ sd_event_unref(comm->loop);
+
+ if (comm->wsj1)
+ afb_wsj1_unref(comm->wsj1);
+
+ if (comm->pending_calls_lock) {
+ discard_pending_calls(comm);
+ pa_mutex_free(comm->pending_calls_lock);
+ }
+
+ if (comm->pipe[0] > 0)
+ close(comm->pipe[0]);
+ if (comm->pipe[1] > 0)
+ close(comm->pipe[1]);
+
+ pa_xfree(comm);
+}
diff --git a/m4a_afb_comm.h b/m4a_afb_comm.h
new file mode 100644
index 0000000..7760bf9
--- /dev/null
+++ b/m4a_afb_comm.h
@@ -0,0 +1,43 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2018 Collabora Ltd.
+ Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifndef __M4A_AFB_COMM_H__
+#define __M4A_AFB_COMM_H__
+
+#include <pulse/json.h>
+
+typedef struct _m4a_afb_comm m4a_afb_comm;
+
+enum m4a_afb_reply {
+ M4A_AFB_REPLY_OK,
+ M4A_AFB_REPLY_ERROR
+};
+
+typedef void (*m4a_afb_done_cb_t)(enum m4a_afb_reply r,
+ const pa_json_object *response,
+ void *userdata);
+
+bool m4a_afb_call_async(m4a_afb_comm *comm, const char *api, const char *verb,
+ char *object, m4a_afb_done_cb_t done_cb, void *userdata);
+
+m4a_afb_comm *m4a_afb_comm_new(const char *uri);
+void m4a_afb_comm_free(m4a_afb_comm *comm);
+
+#endif
diff --git a/module-4a-client.c b/module-4a-client.c
new file mode 100644
index 0000000..64502c8
--- /dev/null
+++ b/module-4a-client.c
@@ -0,0 +1,408 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2018 Collabora Ltd.
+ Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdbool.h>
+
+#include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/module.h>
+#include <pulsecore/mutex.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/dynarray.h>
+#include <pulsecore/semaphore.h>
+
+#include "m4a_afb_comm.h"
+
+#define DEFAULT_URI "ws://localhost:1234/api?token="
+#define AHL_4A_API "ahl-4a"
+
+PA_MODULE_AUTHOR("George Kiagiadakis");
+PA_MODULE_DESCRIPTION("Makes PulseAudio work as a client of the AGL Advanced Audio Architecture");
+PA_MODULE_LOAD_ONCE(true);
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_USAGE(
+ "uri=<afb websocket uri>"
+ "default_role=<role>");
+
+static const char* const valid_modargs[] = {
+ "uri",
+ "default_role",
+ NULL
+};
+
+typedef struct _m4a_stream {
+ pa_module *self;
+
+ union {
+ pa_sink_input *sink_input;
+ pa_source_output *source_output;
+ };
+
+ char *role;
+ char *device_uri;
+
+ bool entity_is_sink;
+ bool entity_loaded;
+ uint32_t entity_id;
+ uint32_t entity_module_id;
+
+ pa_semaphore *semaphore;
+
+ PA_LLIST_FIELDS(struct _m4a_stream);
+} m4a_stream;
+
+typedef struct {
+ /* configuration data */
+ pa_modargs *ma;
+
+ /* operational data */
+ pa_mutex *lock;
+ bool null_sink_loaded;
+ uint32_t null_sink_id;
+ uint32_t null_sink_module_id;
+ pa_dynarray *roles; /* element-type: char* */
+ PA_LLIST_HEAD(m4a_stream, streams);
+ pa_hook_slot
+ *sink_input_new_slot,
+ *sink_input_put_slot,
+ *sink_input_unlink_post_slot;
+ m4a_afb_comm *comm;
+} m4a_data;
+
+static bool role_exists(m4a_data *d, const char *role) {
+ char *r;
+ int idx;
+
+ PA_DYNARRAY_FOREACH(r, d->roles, idx) {
+ if (!strcmp(role, r))
+ return true;
+ }
+ return false;
+}
+
+static char *decide_role(m4a_data *d, pa_proplist *p) {
+ const char *role;
+
+ role = pa_proplist_gets(p, PA_PROP_MEDIA_ROLE);
+ if (role && role_exists(d, role)) {
+ return pa_xstrdup(role);
+ }
+
+ /* FIXME we might want to do some mapping here between the standard
+ * pulseaudio roles and whatever roles might be available in 4a */
+
+ /* try the default role specified in the module arguments, or fall back
+ * to "multimedia", which we can just hope that it exists */
+ role = pa_modargs_get_value(d->ma, "default_role", "multimedia");
+ if (role_exists(d, role)) {
+ return pa_xstrdup(role);
+ }
+
+ return NULL;
+}
+
+static pa_sink *load_sink_module(pa_core *core, const char *module, char *params) {
+ pa_module *m = NULL;
+ pa_sink *target = NULL;
+ uint32_t idx;
+
+ if (pa_module_load(&m, core, module, params) < 0) {
+ pa_log("Failed to load module %s %s", module, params);
+ return NULL;
+ }
+ pa_xfree(params);
+ PA_IDXSET_FOREACH(target, core->sinks, idx) {
+ if (target->module == m)
+ break;
+ }
+ if (target->module != m) {
+ pa_log("%s didn't create any sinks", module);
+ pa_module_unload(m, false);
+ return NULL;
+ }
+ return target;
+}
+
+/* invoked in the afb communication thread */
+static void got_roles_cb(enum m4a_afb_reply r,
+ const pa_json_object *response,
+ pa_module *self) {
+ m4a_data *d = self->userdata;
+ int length, i;
+ const pa_json_object *jr;
+ char *role;
+
+ pa_mutex_lock(d->lock);
+
+ if (r == M4A_AFB_REPLY_OK && pa_json_object_get_type(response) == PA_JSON_TYPE_ARRAY) {
+ length = pa_json_object_get_array_length(response);
+ for (i = 0; i < length; i++) {
+ jr = pa_json_object_get_array_member(response, i);
+ if (pa_json_object_get_type(jr) == PA_JSON_TYPE_STRING) {
+ role = pa_xstrdup(pa_json_object_get_string(jr));
+ pa_log_debug("Found 4a role: %s", role);
+ pa_dynarray_append(d->roles, role);
+ }
+ }
+ }
+
+ pa_mutex_unlock(d->lock);
+}
+
+static m4a_stream *m4a_stream_new(pa_module *self, bool sink) {
+ m4a_data *d = self->userdata;
+ m4a_stream *stream;
+
+ stream = pa_xnew0(m4a_stream, 1);
+ stream->self = self;
+ stream->entity_is_sink = sink;
+ stream->semaphore = pa_semaphore_new(0);
+ pa_mutex_lock(d->lock);
+ PA_LLIST_PREPEND(m4a_stream, d->streams, stream);
+ pa_mutex_unlock(d->lock);
+ return stream;
+}
+
+static void m4a_stream_free(m4a_stream *stream) {
+ m4a_data *d = stream->self->userdata;
+
+ if (stream->role)
+ pa_xfree(stream->role);
+ if (stream->device_uri)
+ pa_xfree(stream->device_uri);
+ if (stream->entity_loaded)
+ pa_module_unload_request_by_index(stream->self->core,
+ stream->entity_module_id, false);
+ pa_semaphore_free(stream->semaphore);
+ pa_mutex_lock(d->lock);
+ PA_LLIST_REMOVE(m4a_stream, d->streams, stream);
+ pa_mutex_unlock(d->lock);
+ pa_xfree(stream);
+}
+
+static pa_hook_result_t sink_input_new_cb(pa_core *core,
+ pa_sink_input *i,
+ pa_module *self) {
+ m4a_data *d = self->userdata;
+ pa_sink *sink;
+
+ pa_core_assert_ref(core);
+
+ /* first, forcibly set the sink to be our null sink */
+ sink = pa_idxset_get_by_index(core->sinks, d->null_sink_id);
+ if (sink) {
+ i->sink = sink;
+ i->sink_requested_by_application = false;
+ }
+
+ return PA_HOOK_OK;
+}
+
+/* invoked in the afb communication thread */
+static void device_open_cb(enum m4a_afb_reply r,
+ const pa_json_object *response,
+ m4a_stream *stream) {
+ const pa_json_object *jdu;
+ const char *device_uri = NULL;
+
+ if (r == M4A_AFB_REPLY_OK && pa_json_object_get_type(response) == PA_JSON_TYPE_OBJECT) {
+ jdu = pa_json_object_get_object_member(response, "device_uri");
+ if (jdu && pa_json_object_get_type(jdu) == PA_JSON_TYPE_STRING) {
+ device_uri = pa_json_object_get_string(jdu);
+ }
+ }
+
+ pa_log_debug("4A replied: %s, device_uri: %s",
+ (r == M4A_AFB_REPLY_OK) ? "OK" : "ERROR",
+ device_uri ? device_uri : "(null)");
+
+ stream->device_uri = pa_xstrdup(device_uri);
+ pa_semaphore_post(stream->semaphore);
+}
+
+static pa_hook_result_t sink_input_put_cb(pa_core *core,
+ pa_sink_input *i,
+ pa_module *self) {
+ m4a_data *d = self->userdata;
+ char *role;
+
+ pa_mutex_lock(d->lock);
+ role = decide_role(d, i->proplist);
+ pa_mutex_unlock(d->lock);
+
+ pa_log_info("New sink_input with role: %s", role ? role : "(null)");
+
+ if (role) {
+ m4a_stream *stream = m4a_stream_new(self, true);
+ stream->role = role;
+ stream->sink_input = i;
+
+ pa_log_debug("Calling 4A to open the device");
+
+ m4a_afb_call_async(d->comm, AHL_4A_API, role,
+ pa_xstrdup("{\"action\":\"open\"}"),
+ (m4a_afb_done_cb_t) device_open_cb, stream);
+
+ pa_log_debug("waiting for 4A to reply");
+ pa_semaphore_wait(stream->semaphore);
+
+ if (stream->device_uri) {
+ pa_sink *s;
+
+ if ((s = load_sink_module(self->core, "module-alsa-sink",
+ pa_sprintf_malloc("device=%s", stream->device_uri)))) {
+ stream->entity_id = s->index;
+ stream->entity_module_id = s->module->index;
+ stream->entity_loaded = true;
+
+ pa_log_info("moving sink_input to alsa sink");
+ pa_sink_input_move_to(i, s, false);
+ } else {
+ pa_xfree(stream->device_uri);
+ stream->device_uri = NULL;
+ }
+ }
+
+ if (!stream->device_uri) {
+ pa_log_info("sink_input is not authorized to connect to 4A");
+ //TODO maybe queue and play this stream when 4A allows it
+
+ m4a_stream_free(stream);
+ return PA_HOOK_CANCEL;
+ }
+ }
+
+ return PA_HOOK_OK;
+}
+
+/* invoked in the afb communication thread */
+static void device_close_cb(enum m4a_afb_reply r,
+ const pa_json_object *response,
+ m4a_stream *stream) {
+ pa_log_debug("4A replied: %s",
+ (r == M4A_AFB_REPLY_OK) ? "OK" : "ERROR");
+ m4a_stream_free(stream);
+}
+
+static pa_hook_result_t sink_input_unlink_post_cb(pa_core *core,
+ pa_sink_input *i,
+ pa_module *self) {
+ m4a_data *d = self->userdata;
+ m4a_stream *stream = NULL;
+
+ PA_LLIST_FOREACH(stream, d->streams) {
+ if (stream->sink_input == i)
+ break;
+ }
+
+ if (stream && stream->sink_input == i) {
+ pa_log_debug("unloading module-alsa-sink (%s)", stream->role);
+
+ pa_module_unload_by_index(stream->self->core,
+ stream->entity_module_id, false);
+ stream->entity_loaded = false;
+
+ m4a_afb_call_async(d->comm, AHL_4A_API, stream->role,
+ pa_xstrdup("{\"action\":\"close\"}"),
+ (m4a_afb_done_cb_t) device_close_cb, stream);
+ }
+
+ return PA_HOOK_OK;
+}
+
+int pa__init(pa_module *self) {
+ m4a_data *d;
+ const char *uri;
+ pa_sink *null_sink;
+
+ pa_assert(self);
+
+ self->userdata = d = pa_xnew0(m4a_data, 1);
+ d->roles = pa_dynarray_new(pa_xfree);
+ d->lock = pa_mutex_new(false, false);
+
+ if (!(d->ma = pa_modargs_new(self->argument, valid_modargs))) {
+ pa_log("Failed to parse module arguments");
+ goto fail;
+ }
+
+ uri = pa_modargs_get_value(d->ma, "uri", DEFAULT_URI);
+ if (!(d->comm = m4a_afb_comm_new(uri))) {
+ goto fail;
+ }
+
+ if (!m4a_afb_call_async(d->comm, AHL_4A_API, "get_roles", NULL,
+ (m4a_afb_done_cb_t) got_roles_cb, self)) {
+ goto fail;
+ }
+
+ null_sink = load_sink_module(self->core, "module-null-sink", pa_sprintf_malloc(
+ "sink_name=aaaa_null_sink sink_properties='device.description=\"%s\"'",
+ _("4A Null Output")));
+ d->null_sink_id = null_sink->index;
+ d->null_sink_module_id = null_sink->module->index;
+ d->null_sink_loaded = true;
+
+ d->sink_input_new_slot = pa_hook_connect(&self->core->hooks[PA_CORE_HOOK_SINK_INPUT_NEW],
+ PA_HOOK_NORMAL, (pa_hook_cb_t) sink_input_new_cb, self);
+ d->sink_input_put_slot = pa_hook_connect(&self->core->hooks[PA_CORE_HOOK_SINK_INPUT_PUT],
+ PA_HOOK_NORMAL, (pa_hook_cb_t) sink_input_put_cb, self);
+ d->sink_input_unlink_post_slot = pa_hook_connect(&self->core->hooks[PA_CORE_HOOK_SINK_INPUT_UNLINK_POST],
+ PA_HOOK_NORMAL, (pa_hook_cb_t) sink_input_unlink_post_cb, self);
+ return 0;
+
+fail:
+ pa__done(self);
+ return -1;
+}
+
+void pa__done(pa_module *self) {
+ m4a_data *d;
+
+ pa_assert(self);
+
+ d = self->userdata;
+
+ while (d->streams)
+ m4a_stream_free(d->streams);
+
+ pa_dynarray_free(d->roles);
+ pa_mutex_free(d->lock);
+
+ if (d->ma)
+ pa_modargs_free(d->ma);
+
+ if (d->sink_input_new_slot)
+ pa_hook_slot_free(d->sink_input_new_slot);
+ if (d->sink_input_put_slot)
+ pa_hook_slot_free(d->sink_input_put_slot);
+ if (d->sink_input_unlink_post_slot)
+ pa_hook_slot_free(d->sink_input_unlink_post_slot);
+
+ if (d->null_sink_loaded)
+ pa_module_unload_request_by_index(self->core, d->null_sink_module_id, false);
+
+ if (d->comm)
+ m4a_afb_comm_free(d->comm);
+}