From 82c1c0ab04219f9453f1b3a14a9754068e360583 Mon Sep 17 00:00:00 2001 From: Scott Murray Date: Thu, 24 Aug 2023 15:43:08 -0400 Subject: Rework to switch to using KUKSA.val databroker Rework to use the "VAL" gRPC API from the KUKSA.val databroker instead of the older server's WebSocket interface. Some source files have been renamed to match the class naming to provide a bit more consistency. Bug-AGL: SPEC-4762 Signed-off-by: Scott Murray Change-Id: I5ded74cfbd6987cd045b7b142fd9f38971aaef66 --- meson.build | 2 - meson_options.txt | 1 + src/AudiomixerService.cpp | 278 ++++++++++++++++++++++++ src/AudiomixerService.h | 72 +++++++ src/KuksaClient.cpp | 373 ++++++++++++++++++++++++++++++++ src/KuksaClient.h | 79 +++++++ src/KuksaConfig.cpp | 147 +++++++++++++ src/KuksaConfig.h | 43 ++++ src/audiomixer-service.cpp | 129 ----------- src/audiomixer-service.hpp | 44 ---- src/main.cpp | 62 ++++-- src/meson.build | 59 ++++- src/vis-config.cpp | 167 --------------- src/vis-config.hpp | 43 ---- src/vis-session.cpp | 380 --------------------------------- src/vis-session.hpp | 78 ------- systemd/agl-service-audiomixer.service | 4 +- systemd/meson.build | 2 + 18 files changed, 1088 insertions(+), 875 deletions(-) create mode 100644 meson_options.txt create mode 100644 src/AudiomixerService.cpp create mode 100644 src/AudiomixerService.h create mode 100644 src/KuksaClient.cpp create mode 100644 src/KuksaClient.h create mode 100644 src/KuksaConfig.cpp create mode 100644 src/KuksaConfig.h delete mode 100644 src/audiomixer-service.cpp delete mode 100644 src/audiomixer-service.hpp delete mode 100644 src/vis-config.cpp delete mode 100644 src/vis-config.hpp delete mode 100644 src/vis-session.cpp delete mode 100644 src/vis-session.hpp diff --git a/meson.build b/meson.build index 47e88a4..2a35b2b 100644 --- a/meson.build +++ b/meson.build @@ -3,8 +3,6 @@ project('agl-service-audiomixer', license : 'Apache-2.0', default_options : ['c_std=c17', 'cpp_std=c++17']) -systemd_dep = dependency('systemd') - subdir('src') subdir('systemd') diff --git a/meson_options.txt b/meson_options.txt new file mode 100644 index 0000000..1fe219e --- /dev/null +++ b/meson_options.txt @@ -0,0 +1 @@ +option('protos', type : 'string', value : '/usr/include', description : 'Include directory for .proto files') diff --git a/src/AudiomixerService.cpp b/src/AudiomixerService.cpp new file mode 100644 index 0000000..f2445b8 --- /dev/null +++ b/src/AudiomixerService.cpp @@ -0,0 +1,278 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include + +#include "AudiomixerService.h" + +AudiomixerService::AudiomixerService(const KuksaConfig &config, GMainLoop *loop) : + m_loop(loop), + m_config(config) +{ + m_audiomixer = audiomixer_new(); + if (m_audiomixer) { + // Set up callbacks for WirePlumber events + m_audiomixer_events.controls_changed = mixer_control_change_cb; + m_audiomixer_events.value_changed = mixer_value_change_cb; + audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this); + + // Drive connecting to PipeWire core and refreshing controls list + audiomixer_lock(m_audiomixer); + audiomixer_ensure_controls(m_audiomixer, 3); + audiomixer_unlock(m_audiomixer); + } else { + std::cerr << "Could not create WirePlumber connection" << std::endl; + } + + // Set initial volume + if (m_audiomixer) { + audiomixer_lock(m_audiomixer); + const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); + if (ctl) { + audiomixer_change_volume(m_audiomixer, ctl, 0.5); + } + audiomixer_unlock(m_audiomixer); + } + + // Create gRPC channel + std::string host = m_config.hostname(); + host += ":"; + std::stringstream ss; + ss << m_config.port(); + host += ss.str(); + + std::shared_ptr channel; + if (!m_config.caCert().empty()) { + grpc::SslCredentialsOptions options; + options.pem_root_certs = m_config.caCert(); + if (!m_config.tlsServerName().empty()) { + grpc::ChannelArguments args; + auto target = m_config.tlsServerName(); + std::cout << "Overriding TLS target name with " << target << std::endl; + args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target); + channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args); + } else { + channel = grpc::CreateChannel(host, grpc::SslCredentials(options)); + } + } else { + channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials()); + } + + // Wait for the channel to be ready + std::cout << "Waiting for Databroker gRPC channel" << std::endl; + while (!channel->WaitForConnected(std::chrono::system_clock::now() + + std::chrono::milliseconds(500))) ; + std::cout << "Databroker gRPC channel ready" << std::endl; + + m_broker = new KuksaClient(channel, m_config); + if (m_broker) { + // Listen to actuator target updates for the volume signal and + // values for the steering wheel volume switch signals. + std::map signals; + signals["Vehicle.Cabin.Infotainment.Media.Volume"] = true; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"] = false; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"] = false; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"] = false; + m_broker->subscribe(signals, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); + + // Set initial volume in VSS + // Push out the default value of 50 which matches the default in the + // homescreen app. Ideally there would be some form of persistence + // scheme to restore the last value on restart. + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + 50U, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } else { + std::cerr << "Could not create Kuksa API client" << std::endl; + } +} + +AudiomixerService::~AudiomixerService() +{ + delete m_broker; + + audiomixer_free(m_audiomixer); +} + +// Private + +void AudiomixerService::HandleSignalChange(const std::string &path, const Datapoint &dp) +{ + if (m_config.verbose() > 1) + std::cout << "AudiomixerService::HandleSignalChange: Value received for " << path << std::endl; + + if (!m_audiomixer) + return; + + audiomixer_lock(m_audiomixer); + + const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); + if (!ctl) { + audiomixer_unlock(m_audiomixer); + return; + } + if (path == "Vehicle.Cabin.Infotainment.Media.Volume") { + if (dp.has_uint32()) { + uint32_t volume = dp.uint32(); + if (volume >= 0 && volume <= 100) { + double v = (double) volume / 100.0; + if (m_config.verbose() > 1) + std::cout << "Setting volume to " << v << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, v); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + volume, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp") { + if (dp.has_bool_() && dp.bool_()) { + double volume = ctl->volume; + volume += 0.05; // up 5% + if (volume > 1.0) + volume = 1.0; // clamp to 100% + if (m_config.verbose() > 1) + std::cout << "Increasing volume to " << volume << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, volume); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + (unsigned) (volume * 100), + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown") { + if (dp.has_bool_() && dp.bool_()) { + double volume = ctl->volume; + volume -= 0.05; // down 5% + if (volume < 0.0) + volume = 0.0; // clamp to 0% + if (m_config.verbose() > 1) + std::cout << "Decreasing volume to " << volume << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, volume); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + (unsigned) (volume * 100), + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute") { + if (dp.has_bool_() && dp.bool_()) { + if (m_config.verbose() > 1) { + if (ctl->mute) + std::cout << "Unmuting" << std::endl; + else + std::cout << "Muting" << std::endl; + } + audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute); + } + } + // else ignore + + audiomixer_unlock(m_audiomixer); +} + +void AudiomixerService::HandleSignalSetError(const std::string &path, const Error &error) +{ + std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl; +} + +void AudiomixerService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status) +{ + if (m_config.verbose()) + std::cout << "Subscribe status = " << status.error_code() << + " (" << status.error_message() << ")" << std::endl; + + if (status.error_code() == grpc::CANCELLED) { + if (m_config.verbose()) + std::cerr << "Subscribe canceled, assuming shutdown" << std::endl; + return; + } + + // Queue up a resubcribe via the GLib event loop callback + struct resubscribe_data *data = new (struct resubscribe_data); + if (!data) { + std::cerr << "Could not create resubcribe_data" << std::endl; + exit(1); + } + data->self = this; + // Need to copy request since the one we have been handed is from the + // finished subscribe and will be going away. + data->request = new SubscribeRequest(*request); + if (!data->request) { + std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl; + exit(1); + } + + // NOTE: Waiting 100 milliseconds for now; it is possible that some + // randomization and/or back-off may need to be added if many + // subscribes are active, or switching to some other resubscribe + // scheme altogether (e.g. post subscribes to a thread that waits + // for the channel to become connected again). + g_timeout_add_full(G_PRIORITY_DEFAULT, + 100, + resubscribe_cb, + data, + NULL); +} + +void AudiomixerService::Resubscribe(const SubscribeRequest *request) +{ + if (!(m_broker && request)) + return; + + m_broker->subscribe(request, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); +} + +void AudiomixerService::HandleMixerControlChange(void) +{ + // Ignore for now +} + +void AudiomixerService::HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control) +{ + if (!control) + return; + + if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) { + if (std::string(control->name) == "Master Playback") { + // Push out new value + unsigned value = control->volume * 100; + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + value, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) { + // For now, do nothing, new state is in control->mute + } +} diff --git a/src/AudiomixerService.h b/src/AudiomixerService.h new file mode 100644 index 0000000..3cd87c6 --- /dev/null +++ b/src/AudiomixerService.h @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _AUDIOMIXER_SERVICE_H +#define _AUDIOMIXER_SERVICE_H + +#include + +#include "KuksaConfig.h" +#include "KuksaClient.h" +#include "audiomixer.h" + +class AudiomixerService +{ +public: + AudiomixerService(const KuksaConfig &config, GMainLoop *loop = NULL); + + ~AudiomixerService(); + + // Callbacks for WirePlumber API + + static void mixer_control_change_cb(void *data) { + if (data) + ((AudiomixerService*) data)->HandleMixerControlChange(); + }; + + static void mixer_value_change_cb(void *data, + unsigned int change_mask, + const struct mixer_control *control) { + if (data) + ((AudiomixerService*) data)->HandleMixerValueChange(change_mask, control); + } + + // Callback for KuksaClient subscribe API reconnect + + static gboolean resubscribe_cb(gpointer data) { + struct resubscribe_data *d = (struct resubscribe_data*) data; + if (d && d->self) { + ((AudiomixerService*) d->self)->Resubscribe(d->request); + } + return FALSE; + } + +private: + struct resubscribe_data { + AudiomixerService *self; + const SubscribeRequest *request; + }; + + GMainLoop *m_loop; + KuksaConfig m_config; + KuksaClient *m_broker; + struct audiomixer *m_audiomixer; + struct audiomixer_events m_audiomixer_events; + + void HandleSignalChange(const std::string &path, const Datapoint &dp); + + void HandleSignalSetError(const std::string &path, const Error &error); + + void HandleSubscribeDone(const SubscribeRequest *request, const Status &status); + + void Resubscribe(const SubscribeRequest *request); + + void HandleMixerControlChange(void); + + void HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control); +}; + +#endif // _AUDIOMIXER_SERVICE_H diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp new file mode 100644 index 0000000..844ce0e --- /dev/null +++ b/src/KuksaClient.cpp @@ -0,0 +1,373 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include + +#include "KuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) : + m_config(config) +{ + m_stub = VAL::NewStub(channel); +} + +void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleGetResponse(response, cb); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_string(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto entry = request->add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const std::map signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + for(auto it = signals.cbegin(); it != signals.cend(); ++it) { + auto entry = request->add_entries(); + entry->set_path(it->first); + entry->add_fields(Field::FIELD_PATH); + if (it->second) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + } + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + if (!(request && cb)) + return; + + class Reader : public grpc::ClientReadReactor { + public: + Reader(VAL::Stub *stub, + KuksaClient *client, + KuksaConfig &config, + const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb): + client_(client), + config_(config), + request_(request), + cb_(cb), + done_cb_(done_cb) { + std::string token = config_.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context_.AddMetadata(std::string("authorization"), token); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_, cb_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + std::cerr << "KuksaClient::subscribe::Reader done" << std::endl; + client_->handleSubscribeDone(request_, status_, done_cb_); + } + + // gRPC engine is done with us, safe to self-delete + delete request_; + delete this; + } + + private: + KuksaClient *client_; + KuksaConfig config_; + const SubscribeRequest *request_; + SubscribeResponseCallback cb_; + SubscribeDoneCallback done_cb_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; + }; + Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb); + if (!reader) + handleCriticalFailure("Could not create Subscribe reader"); +} + +// Private + +void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleSetResponse(response, cb); + delete response; + delete context; + }); +} + +void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb) +{ + if (!(response && response->entries_size() && cb)) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + cb(it->path(), dp); + } +} + +void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb) +{ + if (!(response && response->errors_size() && cb)) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + cb(it->path(), it->error()); + } + +} + +void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb) +{ + if (!(response && response->updates_size() && cb)) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + if (m_config.verbose()) + std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl; + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + cb(entry.path(), dp); + } +} + +void KuksaClient::handleSubscribeDone(const SubscribeRequest *request, + const Status &status, + SubscribeDoneCallback cb) +{ + if (cb) + cb(request, status); +} + +void KuksaClient::handleCriticalFailure(const std::string &error) +{ + if (error.size()) + std::cerr << error << std::endl; + exit(1); +} + diff --git a/src/KuksaClient.h b/src/KuksaClient.h new file mode 100644 index 0000000..fd9a9e0 --- /dev/null +++ b/src/KuksaClient.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KUKSA_CLIENT_H +#define KUKSA_CLIENT_H + +#include +#include +#include +#include "kuksa/val/v1/val.grpc.pb.h" + +// Just pull in the whole namespace since Datapoint contains a lot of +// definitions that may potentially be needed. +using namespace kuksa::val::v1; + +using grpc::Status; + +#include "KuksaConfig.h" + +// API response callback types +typedef std::function GetResponseCallback; +typedef std::function SetResponseCallback; +typedef std::function SubscribeResponseCallback; +typedef std::function SubscribeDoneCallback; + +// KUKSA.val databroker "VAL" gRPC API client class + +class KuksaClient +{ +public: + explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config); + + void get(const std::string &path, GetResponseCallback cb, const bool actuator = false); + + void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false); + + void subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator = false, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const std::map signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + +private: + KuksaConfig m_config; + std::shared_ptr m_stub; + + void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator); + + void handleGetResponse(const GetResponse *response, GetResponseCallback cb); + + void handleSetResponse(const SetResponse *response, SetResponseCallback cb); + + void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb); + + void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb); + + void handleCriticalFailure(const std::string &error); + void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); }; +}; + +#endif // KUKSA_CLIENT_H diff --git a/src/KuksaConfig.cpp b/src/KuksaConfig.cpp new file mode 100644 index 0000000..8fe09f6 --- /dev/null +++ b/src/KuksaConfig.cpp @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include "KuksaConfig.h" + +namespace property_tree = boost::property_tree; +namespace filesystem = boost::filesystem; + +#define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem" + +inline +void load_string_file(const filesystem::path& p, std::string& str) +{ + std::ifstream file; + file.exceptions(std::ifstream::failbit | std::ifstream::badbit); + file.open(p, std::ios_base::binary); + if (file.good()) { + std::size_t sz = static_cast(filesystem::file_size(p)); + str.resize(sz, '\0'); + file.read(&str[0], sz); + } else { + str.clear(); + } +} + +KuksaConfig::KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken) : + m_hostname(hostname), + m_port(port), + m_caCert(caCert), + m_tlsServerName(tlsServerName), + m_authToken(authToken), + m_verbose(0), + m_valid(true) +{ + // Potentially could do some certificate validation here... +} + +KuksaConfig::KuksaConfig(const std::string &appname) : + m_valid(false) +{ + std::string config("/etc/xdg/AGL/"); + config += appname; + config += ".conf"; + char *home = getenv("XDG_CONFIG_HOME"); + if (home) { + config = home; + config += "/AGL/"; + config += appname; + config += ".conf"; + } + + std::cout << "Using configuration " << config << std::endl; + property_tree::ptree pt; + try { + property_tree::ini_parser::read_ini(config, pt); + } + catch (std::exception &ex) { + std::cerr << "Could not read " << config << std::endl; + return; + } + const property_tree::ptree &settings = + pt.get_child("kuksa-client", property_tree::ptree()); + + m_hostname = settings.get("server", "localhost"); + std::stringstream ss; + ss << m_hostname; + ss >> std::quoted(m_hostname); + if (m_hostname.empty()) { + std::cerr << "Invalid server hostname" << std::endl; + return; + } + + m_port = settings.get("port", 55555); + if (m_port == 0) { + std::cerr << "Invalid server port" << std::endl; + return; + } + + std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE); + std::stringstream().swap(ss); + ss << caCertFileName; + ss >> std::quoted(caCertFileName); + if (caCertFileName.empty()) { + std::cerr << "Invalid CA certificate filename" << std::endl; + return; + } + readFile(caCertFileName, m_caCert); + if (m_caCert.empty()) { + std::cerr << "Invalid CA certificate file" << std::endl; + return; + } + + m_tlsServerName = settings.get("tls-server-name", ""); + + std::string authTokenFileName = settings.get("authorization", ""); + std::stringstream().swap(ss); + ss << authTokenFileName; + ss >> std::quoted(authTokenFileName); + if (authTokenFileName.empty()) { + std::cerr << "Invalid authorization token filename" << std::endl; + return; + } + readFile(authTokenFileName, m_authToken); + if (m_authToken.empty()) { + std::cerr << "Invalid authorization token file" << std::endl; + return; + } + + m_verbose = 0; + std::string verbose = settings.get("verbose", ""); + std::stringstream().swap(ss); + ss << verbose; + ss >> std::quoted(verbose); + if (!verbose.empty()) { + if (verbose == "true" || verbose == "1") + m_verbose = 1; + if (verbose == "2") + m_verbose = 2; + } + + m_valid = true; +} + +// Private + +void KuksaConfig::readFile(const std::string &filename, std::string &data) +{ + try { + load_string_file(filename, data); + } catch (const std::exception &e) { + data.clear(); + } +} diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h new file mode 100644 index 0000000..e70385f --- /dev/null +++ b/src/KuksaConfig.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _KUKSA_CONFIG_H +#define _KUKSA_CONFIG_H + +#include + +class KuksaConfig +{ +public: + explicit KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken); + explicit KuksaConfig(const std::string &appname); + ~KuksaConfig() {}; + + std::string hostname() { return m_hostname; }; + unsigned port() { return m_port; }; + std::string caCert() { return m_caCert; }; + std::string tlsServerName() { return m_tlsServerName; }; + std::string authToken() { return m_authToken; }; + bool valid() { return m_valid; }; + unsigned verbose() { return m_verbose; }; + +private: + std::string m_hostname; + unsigned m_port; + std::string m_caCert; + std::string m_tlsServerName; + std::string m_authToken; + unsigned m_verbose; + bool m_valid; + + void readFile(const std::string &filename, std::string &data); +}; + +#endif // _KUKSA_CONFIG_H diff --git a/src/audiomixer-service.cpp b/src/audiomixer-service.cpp deleted file mode 100644 index 5787153..0000000 --- a/src/audiomixer-service.cpp +++ /dev/null @@ -1,129 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "audiomixer-service.hpp" -#include -#include - - -AudiomixerService::AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - VisSession(config, ioc, ctx) -{ - m_audiomixer = audiomixer_new(); - if (m_audiomixer) { - // Set up callbacks for WirePlumber events - m_audiomixer_events.controls_changed = audiomixer_control_change_cb; - m_audiomixer_events.value_changed = audiomixer_value_change_cb; - audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this); - - // Drive connecting to PipeWire core and refreshing controls list - audiomixer_lock(m_audiomixer); - audiomixer_ensure_controls(m_audiomixer, 3); - audiomixer_unlock(m_audiomixer); - } else { - std::cerr << "Could not create WirePlumber connection" << std::endl; - } -} - -AudiomixerService::~AudiomixerService() -{ - audiomixer_free(m_audiomixer); -} - -void AudiomixerService::handle_authorized_response(void) -{ - subscribe("Vehicle.Cabin.Infotainment.Media.Volume"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"); - - // Set initial volume in VSS - // For now a value of 50 matches the default in the homescreen app. - // Ideally there would be some form of persistence scheme to restore - // the last value on restart. - set("Vehicle.Cabin.Infotainment.Media.Volume", "50"); -} - -void AudiomixerService::handle_get_response(std::string &path, std::string &value, std::string ×tamp) -{ - // Placeholder since no gets are performed ATM -} - -void AudiomixerService::handle_notification(std::string &path, std::string &value, std::string ×tamp) -{ - if (!m_audiomixer) { - return; - } - - audiomixer_lock(m_audiomixer); - - const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); - if (!ctl) { - audiomixer_unlock(m_audiomixer); - return; - } - - if (path == "Vehicle.Cabin.Infotainment.Media.Volume") { - try { - int volume = std::stoi(value); - if (volume >= 0 && volume <= 100) { - double v = (double) volume / 100.0; - if (m_config.verbose() > 1) - std::cout << "Setting volume to " << v << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, v); - } - } - catch (std::exception ex) { - // ignore bad value - } - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp" && value == "true") { - double volume = ctl->volume; - volume += 0.05; // up 5% - if (volume > 1.0) - volume = 1.0; // clamp to 100% - if (m_config.verbose() > 1) - std::cout << "Increasing volume to " << volume << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, volume); - - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown" && value == "true") { - double volume = ctl->volume; - volume -= 0.05; // down 5% - if (volume < 0.0) - volume = 0.0; // clamp to 0% - if (m_config.verbose() > 1) - std::cout << "Decreasing volume to " << volume << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, volume); - - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute" && value == "true") { - if (m_config.verbose() > 1) { - if (ctl->mute) - std::cout << "Unmuting" << std::endl; - else - std::cout << "Muting" << std::endl; - } - audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute); - } - // else ignore - - audiomixer_unlock(m_audiomixer); -} - -void AudiomixerService::handle_control_change(void) -{ - // Ignore for now -} - -void AudiomixerService::handle_value_change(unsigned int change_mask, const struct mixer_control *control) -{ - if (!control) - return; - - if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) { - if (std::string(control->name) == "Master Playback") { - // Push change into VIS - std::string value = std::to_string((int) (control->volume * 100.0)); - set("Vehicle.Cabin.Infotainment.Media.Volume", value); - } - } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) { - // For now, do nothing, new state is in control->mute - } -} diff --git a/src/audiomixer-service.hpp b/src/audiomixer-service.hpp deleted file mode 100644 index cb00584..0000000 --- a/src/audiomixer-service.hpp +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _AUDIOMIXER_SERVICE_HPP -#define _AUDIOMIXER_SERVICE_HPP - -#include "vis-session.hpp" -#include "audiomixer.h" - -class AudiomixerService : public VisSession -{ - struct audiomixer *m_audiomixer; - -public: - AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - - ~AudiomixerService(); - - static void audiomixer_control_change_cb(void *data) { - if (data) - ((AudiomixerService*) data)->handle_control_change(); - }; - - static void audiomixer_value_change_cb(void *data, - unsigned int change_mask, - const struct mixer_control *control) { - if (data) - ((AudiomixerService*) data)->handle_value_change(change_mask, control); - } - -protected: - struct audiomixer_events m_audiomixer_events; - - virtual void handle_authorized_response(void) override; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) override; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) override; - - virtual void handle_control_change(void); - - virtual void handle_value_change(unsigned int change_mask, const struct mixer_control *control); -}; - -#endif // _AUDIOMIXER_SERVICE_HPP diff --git a/src/main.cpp b/src/main.cpp index 0960f1b..6776472 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,34 +1,54 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include -#include -#include -#include -#include "audiomixer-service.hpp" +//#include +//#include +#include +#include +#include -using work_guard_type = boost::asio::executor_work_guard; +#include "AudiomixerService.h" + +static gboolean quit_cb(gpointer user_data) +{ + GMainLoop *loop = (GMainLoop*) user_data; + + g_info("Quitting..."); + + if (loop) + g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop); + else + exit(0); + + return G_SOURCE_REMOVE; +} int main(int argc, char** argv) { - // The io_context is required for all I/O - net::io_context ioc; + GMainLoop *loop = NULL; + + loop = g_main_loop_new(NULL, FALSE); + if (!loop) { + std::cerr << "Could not create GLib event loop" << std::endl; + exit(1); + } + + KuksaConfig config("agl-service-audiomixer"); - // Register to stop I/O context on SIGINT and SIGTERM - net::signal_set signals(ioc, SIGINT, SIGTERM); - signals.async_wait(boost::bind(&net::io_context::stop, &ioc)); + g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop); + g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop); - // The SSL context is required, and holds certificates - ssl::context ctx{ssl::context::tlsv12_client}; + AudiomixerService service(config, loop); - // Launch the asynchronous operation - VisConfig config("agl-service-audiomixer"); - std::make_shared(config, ioc, ctx)->run(); + sd_notify(0, "READY=1"); - // Ensure I/O context continues running even if there's no work - work_guard_type work_guard(ioc.get_executor()); + g_main_loop_run(loop); - // Run the I/O context - ioc.run(); + // Clean up + g_main_loop_unref(loop); return 0; } diff --git a/src/meson.build b/src/meson.build index b1603fc..6c50419 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,19 +1,60 @@ boost_dep = dependency('boost', version : '>=1.72', modules : [ 'thread', 'filesystem', 'program_options', 'log', 'system' ]) -openssl_dep = dependency('openssl') -thread_dep = dependency('threads') -wp_dep = dependency('wireplumber-0.4') -src = [ 'vis-config.cpp', - 'vis-session.cpp', - 'audiomixer-service.cpp', - 'audiomixer.c', - 'main.cpp' +cpp = meson.get_compiler('cpp') +grpcpp_reflection_dep = cpp.find_library('grpc++_reflection') + +service_dep = [ + boost_dep, + dependency('openssl'), + dependency('threads'), + dependency('libsystemd'), + dependency('wireplumber-0.4'), + dependency('protobuf'), + dependency('grpc'), + dependency('grpc++'), + grpcpp_reflection_dep ] + +protoc = find_program('protoc') +grpc_cpp = find_program('grpc_cpp_plugin') + +protos_base_dir = get_option('protos') +protos_dir = protos_base_dir / 'kuksa/val/v1' +protoc_gen = generator(protoc, \ + output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--cpp_out=@BUILD_DIR@', + '@INPUT@']) +generated_protoc_sources = [ \ + protoc_gen.process(protos_dir / 'types.proto', preserve_path_from : protos_base_dir), + protoc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +grpc_gen = generator(protoc, \ + output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--grpc_out=@BUILD_DIR@', + '--plugin=protoc-gen-grpc=' + grpc_cpp.path(), + '@INPUT@']) +generated_grpc_sources = [ \ + grpc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +src = [ + 'KuksaConfig.cpp', + 'KuksaClient.cpp', + 'AudiomixerService.cpp', + 'audiomixer.c', + 'main.cpp', + generated_protoc_sources, + generated_grpc_sources, +] + executable('agl-service-audiomixer', src, - dependencies: [boost_dep, openssl_dep, thread_dep, systemd_dep, wp_dep], + dependencies: service_dep, install: true, c_args : [ '-D_XOPEN_SOURCE=700', diff --git a/src/vis-config.cpp b/src/vis-config.cpp deleted file mode 100644 index a55e235..0000000 --- a/src/vis-config.cpp +++ /dev/null @@ -1,167 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "vis-config.hpp" -#include -#include -#include -#include -#include -#include - -namespace property_tree = boost::property_tree; -namespace filesystem = boost::filesystem; - -#define DEFAULT_CLIENT_KEY_FILE "/etc/kuksa-val/Client.key" -#define DEFAULT_CLIENT_CERT_FILE "/etc/kuksa-val/Client.pem" -#define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem" - -inline -void load_string_file(const filesystem::path& p, std::string& str) -{ - std::ifstream file; - file.exceptions(std::ifstream::failbit | std::ifstream::badbit); - file.open(p, std::ios_base::binary); - std::size_t sz = static_cast(filesystem::file_size(p)); - str.resize(sz, '\0'); - file.read(&str[0], sz); -} - -VisConfig::VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer) : - m_hostname(hostname), - m_port(port), - m_clientKey(clientKey), - m_clientCert(clientCert), - m_caCert(caCert), - m_authToken(authToken), - m_verifyPeer(verifyPeer), - m_verbose(0), - m_valid(true) -{ - // Potentially could do some certificate validation here... -} - -VisConfig::VisConfig(const std::string &appname) : - m_valid(false) -{ - std::string config("/etc/xdg/AGL/"); - config += appname; - config += ".conf"; - char *home = getenv("XDG_CONFIG_HOME"); - if (home) { - config = home; - config += "/AGL/"; - config += appname; - config += ".conf"; - } - - std::cout << "Using configuration " << config << std::endl; - property_tree::ptree pt; - try { - property_tree::ini_parser::read_ini(config, pt); - } - catch (std::exception &ex) { - std::cerr << "Could not read " << config << std::endl; - return; - } - const property_tree::ptree &settings = - pt.get_child("vis-client", property_tree::ptree()); - - m_hostname = settings.get("server", "localhost"); - std::stringstream ss; - ss << m_hostname; - ss >> std::quoted(m_hostname); - if (m_hostname.empty()) { - std::cerr << "Invalid server hostname" << std::endl; - return; - } - - m_port = settings.get("port", 8090); - if (m_port == 0) { - std::cerr << "Invalid server port" << std::endl; - return; - } - - // Default to disabling peer verification for now to be able - // to use the default upstream KUKSA.val certificates for - // testing. Wrangling server and CA certificate generation - // and management to be able to verify will require further - // investigation. - m_verifyPeer = settings.get("verify-server", false); - - std::string keyFileName = settings.get("key", DEFAULT_CLIENT_KEY_FILE); - std::stringstream().swap(ss); - ss << keyFileName; - ss >> std::quoted(keyFileName); - ss.str(""); - if (keyFileName.empty()) { - std::cerr << "Invalid client key filename" << std::endl; - return; - } - load_string_file(keyFileName, m_clientKey); - if (m_clientKey.empty()) { - std::cerr << "Invalid client key file" << std::endl; - return; - } - - std::string certFileName = settings.get("certificate", DEFAULT_CLIENT_CERT_FILE); - std::stringstream().swap(ss); - ss << certFileName; - ss >> std::quoted(certFileName); - if (certFileName.empty()) { - std::cerr << "Invalid client certificate filename" << std::endl; - return; - } - load_string_file(certFileName, m_clientCert); - if (m_clientCert.empty()) { - std::cerr << "Invalid client certificate file" << std::endl; - return; - } - - std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE); - std::stringstream().swap(ss); - ss << caCertFileName; - ss >> std::quoted(caCertFileName); - if (caCertFileName.empty()) { - std::cerr << "Invalid CA certificate filename" << std::endl; - return; - } - load_string_file(caCertFileName, m_caCert); - if (m_caCert.empty()) { - std::cerr << "Invalid CA certificate file" << std::endl; - return; - } - - std::string authTokenFileName = settings.get("authorization", ""); - std::stringstream().swap(ss); - ss << authTokenFileName; - ss >> std::quoted(authTokenFileName); - if (authTokenFileName.empty()) { - std::cerr << "Invalid authorization token filename" << std::endl; - return; - } - load_string_file(authTokenFileName, m_authToken); - if (m_authToken.empty()) { - std::cerr << "Invalid authorization token file" << std::endl; - return; - } - - m_verbose = 0; - std::string verbose = settings.get("verbose", ""); - std::stringstream().swap(ss); - ss << verbose; - ss >> std::quoted(verbose); - if (!verbose.empty()) { - if (verbose == "true" || verbose == "1") - m_verbose = 1; - if (verbose == "2") - m_verbose = 2; - } - - m_valid = true; -} diff --git a/src/vis-config.hpp b/src/vis-config.hpp deleted file mode 100644 index b0f72f9..0000000 --- a/src/vis-config.hpp +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_CONFIG_HPP -#define _VIS_CONFIG_HPP - -#include - -class VisConfig -{ -public: - explicit VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer = true); - explicit VisConfig(const std::string &appname); - ~VisConfig() {}; - - std::string hostname() { return m_hostname; }; - unsigned port() { return m_port; }; - std::string clientKey() { return m_clientKey; }; - std::string clientCert() { return m_clientCert; }; - std::string caCert() { return m_caCert; }; - std::string authToken() { return m_authToken; }; - bool verifyPeer() { return m_verifyPeer; }; - bool valid() { return m_valid; }; - unsigned verbose() { return m_verbose; }; - -private: - std::string m_hostname; - unsigned m_port; - std::string m_clientKey; - std::string m_clientCert; - std::string m_caCert; - std::string m_authToken; - bool m_verifyPeer; - unsigned m_verbose; - bool m_valid; -}; - -#endif // _VIS_CONFIG_HPP diff --git a/src/vis-session.cpp b/src/vis-session.cpp deleted file mode 100644 index 53a554c..0000000 --- a/src/vis-session.cpp +++ /dev/null @@ -1,380 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "vis-session.hpp" -#include -#include -#include - - -// Logging helper -static void log_error(beast::error_code error, char const* what) -{ - std::cerr << what << " error: " << error.message() << std::endl; -} - - -// Resolver and socket require an io_context -VisSession::VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - m_config(config), - m_resolver(net::make_strand(ioc)), - m_ws(net::make_strand(ioc), ctx) -{ -} - -// Start the asynchronous operation -void VisSession::run() -{ - if (!m_config.valid()) { - return; - } - - // Start by resolving hostname - m_resolver.async_resolve(m_config.hostname(), - std::to_string(m_config.port()), - beast::bind_front_handler(&VisSession::on_resolve, - shared_from_this())); -} - -void VisSession::on_resolve(beast::error_code error, - tcp::resolver::results_type results) -{ - if(error) { - log_error(error, "resolve"); - return; - } - - // Set a timeout on the connect operation - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Connect to resolved address - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - m_results = results; - connect(); -} - -void VisSession::connect() -{ - beast::get_lowest_layer(m_ws).async_connect(m_results, - beast::bind_front_handler(&VisSession::on_connect, - shared_from_this())); -} - -void VisSession::on_connect(beast::error_code error, - tcp::resolver::results_type::endpoint_type endpoint) -{ - if(error) { - // The server can take a while to be ready to accept connections, - // so keep retrying until we hit the timeout. - if (error == net::error::timed_out) { - log_error(error, "connect"); - return; - } - - // Delay 500 ms before retrying - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - - connect(); - return; - } - - if (m_config.verbose()) - std::cout << "Connected" << std::endl; - - // Set handshake timeout - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Set SNI Hostname (many hosts need this to handshake successfully) - if(!SSL_set_tlsext_host_name(m_ws.next_layer().native_handle(), - m_config.hostname().c_str())) - { - error = beast::error_code(static_cast(::ERR_get_error()), - net::error::get_ssl_category()); - log_error(error, "connect"); - return; - } - - // Update the hostname. This will provide the value of the - // Host HTTP header during the WebSocket handshake. - // See https://tools.ietf.org/html/rfc7230#section-5.4 - m_hostname = m_config.hostname() + ':' + std::to_string(endpoint.port()); - - if (m_config.verbose()) - std::cout << "Negotiating SSL handshake" << std::endl; - - // Perform the SSL handshake - m_ws.next_layer().async_handshake(ssl::stream_base::client, - beast::bind_front_handler(&VisSession::on_ssl_handshake, - shared_from_this())); -} - -void VisSession::on_ssl_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "SSL handshake"); - return; - } - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(m_ws).expires_never(); - - // NOTE: Explicitly not setting websocket stream timeout here, - // as the client is long-running. - - if (m_config.verbose()) - std::cout << "Negotiating WSS handshake" << std::endl; - - // Perform handshake - m_ws.async_handshake(m_hostname, - "/", - beast::bind_front_handler(&VisSession::on_handshake, - shared_from_this())); -} - -void VisSession::on_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "WSS handshake"); - return; - } - - if (m_config.verbose()) - std::cout << "Authorizing" << std::endl; - - // Authorize - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"]= "authorize"; - req["tokens"] = m_config.authToken(); - - m_ws.async_write(net::buffer(req.dump(4)), - beast::bind_front_handler(&VisSession::on_authorize, - shared_from_this())); -} - -void VisSession::on_authorize(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "authorize"); - return; - } - - // Read response - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -// NOTE: Placeholder for now -void VisSession::on_write(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "write"); - return; - } - - // Do nothing... -} - -void VisSession::on_read(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "read"); - return; - } - - // Handle message - std::string s = beast::buffers_to_string(m_buffer.data()); - json response = json::parse(s, nullptr, false); - if (!response.is_discarded()) { - handle_message(response); - } else { - std::cerr << "json::parse failed? got " << s << std::endl; - } - m_buffer.consume(m_buffer.size()); - - // Read next message - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -void VisSession::get(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "get"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::set(const std::string &path, const std::string &value) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "set"; - req["path"] = path; - req["value"] = value; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::subscribe(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "subscribe"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -bool VisSession::parseData(const json &message, std::string &path, std::string &value, std::string ×tamp) -{ - if (message.contains("error")) { - std::string error = message["error"]; - return false; - } - - if (!(message.contains("data") && message["data"].is_object())) { - std::cerr << "Malformed message (data missing)" << std::endl; - return false; - } - auto data = message["data"]; - if (!(data.contains("path") && data["path"].is_string())) { - std::cerr << "Malformed message (path missing)" << std::endl; - return false; - } - path = data["path"]; - // Convert '/' to '.' in paths to ensure consistency for clients - std::replace(path.begin(), path.end(), '/', '.'); - - if (!(data.contains("dp") && data["dp"].is_object())) { - std::cerr << "Malformed message (datapoint missing)" << std::endl; - return false; - } - auto dp = data["dp"]; - if (!dp.contains("value")) { - std::cerr << "Malformed message (value missing)" << std::endl; - return false; - } else if (dp["value"].is_string()) { - value = dp["value"]; - } else if (dp["value"].is_number_float()) { - double num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_unsigned()) { - unsigned int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_integer()) { - int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_boolean()) { - value = dp["value"] ? "true" : "false"; - } else { - std::cerr << "Malformed message (unsupported value type)" << std::endl; - return false; - } - - if (!(dp.contains("ts") && dp["ts"].is_string())) { - std::cerr << "Malformed message (timestamp missing)" << std::endl; - return false; - } - timestamp = dp["ts"]; - - return true; -} - -void VisSession::handle_message(const json &message) -{ - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: enter, message = " << to_string(message) << std::endl; - - if (!message.contains("action")) { - std::cerr << "Received unknown message (no action), discarding" << std::endl; - return; - } - - std::string action = message["action"]; - if (action == "authorize") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS authorization failed: " << error << std::endl; - } else { - if (m_config.verbose() > 1) - std::cout << "authorized" << std::endl; - - handle_authorized_response(); - } - } else if (action == "subscribe") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS subscription failed: " << error << std::endl; - } - } else if (action == "get") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS get failed: " << error << std::endl; - } else { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got response " << path << " = " << value << std::endl; - - handle_get_response(path, value, ts); - } - } - } else if (action == "set") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS set failed: " << error; - } - } else if (action == "subscription") { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got notification " << path << " = " << value << std::endl; - - handle_notification(path, value, ts); - } - } else { - std::cerr << "unhandled VIS response of type: " << action; - } - - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: exit" << std::endl; -} - diff --git a/src/vis-session.hpp b/src/vis-session.hpp deleted file mode 100644 index 8c7b0d9..0000000 --- a/src/vis-session.hpp +++ /dev/null @@ -1,78 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_SESSION_HPP -#define _VIS_SESSION_HPP - -#include "vis-config.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - -namespace beast = boost::beast; -namespace websocket = beast::websocket; -namespace net = boost::asio; -namespace ssl = boost::asio::ssl; -using tcp = boost::asio::ip::tcp; -using json = nlohmann::json; - - -class VisSession : public std::enable_shared_from_this -{ - //net::io_context m_ioc; - tcp::resolver m_resolver; - tcp::resolver::results_type m_results; - std::string m_hostname; - websocket::stream> m_ws; - beast::flat_buffer m_buffer; - -public: - // Resolver and socket require an io_context - explicit VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - - // Start the asynchronous operation - void run(); - -protected: - VisConfig m_config; - std::atomic_uint m_requestid; - - void on_resolve(beast::error_code error, tcp::resolver::results_type results); - - void connect(); - - void on_connect(beast::error_code error, tcp::resolver::results_type::endpoint_type endpoint); - - void on_ssl_handshake(beast::error_code error); - - void on_handshake(beast::error_code error); - - void on_authorize(beast::error_code error, std::size_t bytes_transferred); - - void on_write(beast::error_code error, std::size_t bytes_transferred); - - void on_read(beast::error_code error, std::size_t bytes_transferred); - - void get(const std::string &path); - - void set(const std::string &path, const std::string &value); - - void subscribe(const std::string &path); - - void handle_message(const json &message); - - bool parseData(const json &message, std::string &path, std::string &value, std::string ×tamp); - - virtual void handle_authorized_response(void) = 0; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) = 0; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) = 0; - -}; - -#endif // _VIS_SESSION_HPP diff --git a/systemd/agl-service-audiomixer.service b/systemd/agl-service-audiomixer.service index b6603fa..1a9e530 100644 --- a/systemd/agl-service-audiomixer.service +++ b/systemd/agl-service-audiomixer.service @@ -1,6 +1,6 @@ [Unit] -Requires=kuksa-val.service -After=kuksa-val.service +Requires=kuksa-databroker.service wireplumber.service +After=kuksa-databroker.service wireplumber.service [Service] Type=simple diff --git a/systemd/meson.build b/systemd/meson.build index c94ab80..28f014e 100644 --- a/systemd/meson.build +++ b/systemd/meson.build @@ -1,3 +1,5 @@ +systemd_dep = dependency('systemd') + systemd_system_unit_dir = systemd_dep.get_pkgconfig_variable('systemdsystemunitdir') install_data('agl-service-audiomixer.service', install_dir : systemd_system_unit_dir) -- cgit 1.2.3-korg