/* * Copyright (C) 2018 Konsulko Group * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fileplayer.h" #include #include #include #include #include #include #undef DEBUG struct set_role_data { bool state; std::string output; std::condition_variable cv; }; static void on_hangup(void *closure, struct afb_wsj1 *wsj) { } static void on_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { } static void on_event(void* closure, const char* event, struct afb_wsj1_msg *msg) { } static void on_reply(void *closure, struct afb_wsj1_msg *msg) { struct set_role_data *data = (struct set_role_data*) closure; struct json_object* reply; if(!(data && data->state)) goto reply_done; // We opened the role, return the output reply = afb_wsj1_msg_object_j(msg); if(reply) { #ifdef DEBUG std::cerr << __FUNCTION__ << ": reply = " << \ json_object_to_json_string_ext(reply, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \ std::endl; #endif struct json_object* response; int rc = json_object_object_get_ex(reply, "response", &response); if(rc) { struct json_object* val; rc = json_object_object_get_ex(response, "device_uri", &val); if (rc && json_object_get_string_len(val)) { const char* jres_pcm = json_object_get_string(val); data->output = jres_pcm; #ifdef DEBUG std::cerr << __FUNCTION__ << ": output = " << jres_pcm << std::endl; #endif } } } reply_done: // Signal reply is done data->cv.notify_one(); } static void *afb_loop_thread(struct sd_event* loop) { for(;;) sd_event_run(loop, 30000000); } static void *gst_loop_thread(GMainLoop *loop) { if(loop) g_main_loop_run(loop); } FilePlayer::FilePlayer(const int port, const std::string &token, const std::string &path, const std::string &role) : m_path(path), m_role(role) { std::string uri; if(sd_event_new(&m_afb_loop) < 0) { std::cerr << __FUNCTION__ << ": Failed to create event loop" << std::endl; return; } // Initialize interface for websocket m_itf.on_hangup = on_hangup; m_itf.on_call = on_call; m_itf.on_event = on_event; uri = "ws://localhost:" + std::to_string(port) + "/api?token=" + token; #ifdef DEBUG std::cerr << "Using URI: " << uri << std::endl; #endif m_ws = afb_ws_client_connect_wsj1(m_afb_loop, uri.c_str(), &m_itf, NULL); if(m_ws) { m_afb_thread = std::thread(afb_loop_thread, m_afb_loop); } else { std::cerr << __FUNCTION__ << ": Failed to create websocket connection" << std::endl; goto error; } // Initialize GStreamer gst_init(NULL, NULL); // Create elements we need m_playbin = gst_element_factory_make("playbin", "play"); m_alsa_sink = gst_element_factory_make("alsasink", NULL); if(!(m_playbin && m_alsa_sink)) goto error; // Set up bus callback m_bus = gst_pipeline_get_bus(GST_PIPELINE(m_playbin)); if(!m_bus) goto error; m_gst_loop = g_main_loop_new(NULL, FALSE); if(!m_gst_loop) goto error; gst_bus_add_watch(m_bus, gstreamer_bus_callback, this); // Start thread to run glib main loop for gstreamer bus m_gst_thread = std::thread(gst_loop_thread, m_gst_loop); m_valid = true; return; error: gst_object_unref(m_playbin); m_playbin = nullptr; gst_object_unref(m_alsa_sink); m_alsa_sink = nullptr; gst_object_unref(m_bus); m_bus = nullptr; if(m_gst_loop) { g_main_loop_quit(m_gst_loop); m_gst_loop = nullptr; } if(m_afb_loop) { sd_event_unref(m_afb_loop); m_afb_loop = nullptr; } return; } FilePlayer::~FilePlayer(void) { gst_element_set_state(m_playbin, GST_STATE_NULL); gst_object_unref(m_playbin); gst_object_unref(m_alsa_sink); gst_object_unref(m_bus); sd_event_unref(m_afb_loop); } void FilePlayer::play(bool loop) { std::string output; if(!m_valid || m_playing) return; if(set_role_state(true, &output) != 0) return; if(output.empty()) return; m_playing = true; m_looping = loop; g_object_set(m_alsa_sink, "device", output.c_str(), NULL); g_object_set(m_playbin, "audio-sink", m_alsa_sink, NULL); std::string uri = "file://" + m_path; g_object_set(m_playbin, "uri", uri.c_str(), NULL); // Start playback gst_element_set_state(m_playbin, GST_STATE_PLAYING); return; } void FilePlayer::stop(void) { if(!(m_valid && m_playing)) return; // Stop playback gst_element_set_state(m_playbin, GST_STATE_PAUSED); } gboolean FilePlayer::gstreamer_bus_callback(GstBus *bus, GstMessage *msg, gpointer data) { if(!data) return TRUE; return static_cast(data)->bus_callback(bus, msg); } int FilePlayer::set_role_state(bool state, std::string *output) { if(!m_valid) return -1; set_role_data data; data.state = state; json_object *jsonData = json_object_new_object(); json_object_object_add(jsonData, "action", json_object_new_string(state ? "open" : "close")); int rc = afb_wsj1_call_j(m_ws, "ahl-4a", m_role.c_str(), jsonData, on_reply, (void*) &data); if(rc >= 0) { // Wait for response std::mutex m; std::unique_lock lk(m); data.cv.wait(lk); if(state && output) *output = data.output; } else { std::cerr << __FUNCTION__ << ": Failed to call ahl-4a/" << m_role.c_str() << std::endl; } return rc; } gboolean FilePlayer::bus_callback(GstBus *bus, GstMessage *msg) { switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_EOS: #ifdef DEBUG std::cerr << __FUNCTION__ << ": GST_MESSAGE_EOS" << std::endl; #endif if(m_looping) { // restart playback if at end if (!gst_element_seek(m_playbin, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { std::cerr << "Seek failed!" << std::endl; } } else { // Move stream to paused state since we're done gst_element_set_state(m_playbin, GST_STATE_PAUSED); } break; case GST_MESSAGE_STATE_CHANGED: { if((GstElement*) GST_MESSAGE_SRC(msg) != m_playbin) break; GstState old_state, new_state; gst_message_parse_state_changed(msg, &old_state, &new_state, NULL); #ifdef DEBUG std::cerr << __FUNCTION__ << ": GST_MESSAGE_STATE_CHANGE: " << (int) old_state << " to " << (int) new_state << std::endl; #endif if(old_state == GST_STATE_PLAYING && new_state == GST_STATE_PAUSED) { set_role_state(false); // Seek back to beginning so any subsequent play starts there if (!gst_element_seek(m_playbin, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { std::cerr << "Seek failed!" << std::endl; } m_playing = false; } else if(old_state == GST_STATE_READY && new_state == GST_STATE_NULL) { // clean up g_main_loop_quit(m_gst_loop); } break; } default: break; } return TRUE; }