diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/AudiomixerService.cpp | 278 | ||||
-rw-r--r-- | src/AudiomixerService.h | 72 | ||||
-rw-r--r-- | src/KuksaClient.cpp | 373 | ||||
-rw-r--r-- | src/KuksaClient.h | 79 | ||||
-rw-r--r-- | src/KuksaConfig.cpp (renamed from src/vis-config.cpp) | 96 | ||||
-rw-r--r-- | src/KuksaConfig.h | 43 | ||||
-rw-r--r-- | src/audiomixer-service.cpp | 129 | ||||
-rw-r--r-- | src/audiomixer-service.hpp | 44 | ||||
-rw-r--r-- | src/main.cpp | 62 | ||||
-rw-r--r-- | src/meson.build | 59 | ||||
-rw-r--r-- | src/vis-config.hpp | 43 | ||||
-rw-r--r-- | src/vis-session.cpp | 380 | ||||
-rw-r--r-- | src/vis-session.hpp | 78 |
13 files changed, 974 insertions, 762 deletions
diff --git a/src/AudiomixerService.cpp b/src/AudiomixerService.cpp new file mode 100644 index 0000000..f2445b8 --- /dev/null +++ b/src/AudiomixerService.cpp @@ -0,0 +1,278 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <string> +#include <sstream> +#include <iostream> +#include <algorithm> +#include <thread> +#include <chrono> + +#include "AudiomixerService.h" + +AudiomixerService::AudiomixerService(const KuksaConfig &config, GMainLoop *loop) : + m_loop(loop), + m_config(config) +{ + m_audiomixer = audiomixer_new(); + if (m_audiomixer) { + // Set up callbacks for WirePlumber events + m_audiomixer_events.controls_changed = mixer_control_change_cb; + m_audiomixer_events.value_changed = mixer_value_change_cb; + audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this); + + // Drive connecting to PipeWire core and refreshing controls list + audiomixer_lock(m_audiomixer); + audiomixer_ensure_controls(m_audiomixer, 3); + audiomixer_unlock(m_audiomixer); + } else { + std::cerr << "Could not create WirePlumber connection" << std::endl; + } + + // Set initial volume + if (m_audiomixer) { + audiomixer_lock(m_audiomixer); + const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); + if (ctl) { + audiomixer_change_volume(m_audiomixer, ctl, 0.5); + } + audiomixer_unlock(m_audiomixer); + } + + // Create gRPC channel + std::string host = m_config.hostname(); + host += ":"; + std::stringstream ss; + ss << m_config.port(); + host += ss.str(); + + std::shared_ptr<grpc::Channel> channel; + if (!m_config.caCert().empty()) { + grpc::SslCredentialsOptions options; + options.pem_root_certs = m_config.caCert(); + if (!m_config.tlsServerName().empty()) { + grpc::ChannelArguments args; + auto target = m_config.tlsServerName(); + std::cout << "Overriding TLS target name with " << target << std::endl; + args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target); + channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args); + } else { + channel = grpc::CreateChannel(host, grpc::SslCredentials(options)); + } + } else { + channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials()); + } + + // Wait for the channel to be ready + std::cout << "Waiting for Databroker gRPC channel" << std::endl; + while (!channel->WaitForConnected(std::chrono::system_clock::now() + + std::chrono::milliseconds(500))) ; + std::cout << "Databroker gRPC channel ready" << std::endl; + + m_broker = new KuksaClient(channel, m_config); + if (m_broker) { + // Listen to actuator target updates for the volume signal and + // values for the steering wheel volume switch signals. + std::map<std::string, bool> signals; + signals["Vehicle.Cabin.Infotainment.Media.Volume"] = true; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"] = false; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"] = false; + signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"] = false; + m_broker->subscribe(signals, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); + + // Set initial volume in VSS + // Push out the default value of 50 which matches the default in the + // homescreen app. Ideally there would be some form of persistence + // scheme to restore the last value on restart. + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + 50U, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } else { + std::cerr << "Could not create Kuksa API client" << std::endl; + } +} + +AudiomixerService::~AudiomixerService() +{ + delete m_broker; + + audiomixer_free(m_audiomixer); +} + +// Private + +void AudiomixerService::HandleSignalChange(const std::string &path, const Datapoint &dp) +{ + if (m_config.verbose() > 1) + std::cout << "AudiomixerService::HandleSignalChange: Value received for " << path << std::endl; + + if (!m_audiomixer) + return; + + audiomixer_lock(m_audiomixer); + + const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); + if (!ctl) { + audiomixer_unlock(m_audiomixer); + return; + } + if (path == "Vehicle.Cabin.Infotainment.Media.Volume") { + if (dp.has_uint32()) { + uint32_t volume = dp.uint32(); + if (volume >= 0 && volume <= 100) { + double v = (double) volume / 100.0; + if (m_config.verbose() > 1) + std::cout << "Setting volume to " << v << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, v); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + volume, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp") { + if (dp.has_bool_() && dp.bool_()) { + double volume = ctl->volume; + volume += 0.05; // up 5% + if (volume > 1.0) + volume = 1.0; // clamp to 100% + if (m_config.verbose() > 1) + std::cout << "Increasing volume to " << volume << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, volume); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + (unsigned) (volume * 100), + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown") { + if (dp.has_bool_() && dp.bool_()) { + double volume = ctl->volume; + volume -= 0.05; // down 5% + if (volume < 0.0) + volume = 0.0; // clamp to 0% + if (m_config.verbose() > 1) + std::cout << "Decreasing volume to " << volume << std::endl; + audiomixer_change_volume(m_audiomixer, ctl, volume); + + // Push out new value + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + (unsigned) (volume * 100), + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute") { + if (dp.has_bool_() && dp.bool_()) { + if (m_config.verbose() > 1) { + if (ctl->mute) + std::cout << "Unmuting" << std::endl; + else + std::cout << "Muting" << std::endl; + } + audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute); + } + } + // else ignore + + audiomixer_unlock(m_audiomixer); +} + +void AudiomixerService::HandleSignalSetError(const std::string &path, const Error &error) +{ + std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl; +} + +void AudiomixerService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status) +{ + if (m_config.verbose()) + std::cout << "Subscribe status = " << status.error_code() << + " (" << status.error_message() << ")" << std::endl; + + if (status.error_code() == grpc::CANCELLED) { + if (m_config.verbose()) + std::cerr << "Subscribe canceled, assuming shutdown" << std::endl; + return; + } + + // Queue up a resubcribe via the GLib event loop callback + struct resubscribe_data *data = new (struct resubscribe_data); + if (!data) { + std::cerr << "Could not create resubcribe_data" << std::endl; + exit(1); + } + data->self = this; + // Need to copy request since the one we have been handed is from the + // finished subscribe and will be going away. + data->request = new SubscribeRequest(*request); + if (!data->request) { + std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl; + exit(1); + } + + // NOTE: Waiting 100 milliseconds for now; it is possible that some + // randomization and/or back-off may need to be added if many + // subscribes are active, or switching to some other resubscribe + // scheme altogether (e.g. post subscribes to a thread that waits + // for the channel to become connected again). + g_timeout_add_full(G_PRIORITY_DEFAULT, + 100, + resubscribe_cb, + data, + NULL); +} + +void AudiomixerService::Resubscribe(const SubscribeRequest *request) +{ + if (!(m_broker && request)) + return; + + m_broker->subscribe(request, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); +} + +void AudiomixerService::HandleMixerControlChange(void) +{ + // Ignore for now +} + +void AudiomixerService::HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control) +{ + if (!control) + return; + + if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) { + if (std::string(control->name) == "Master Playback") { + // Push out new value + unsigned value = control->volume * 100; + m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume", + value, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); + } + } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) { + // For now, do nothing, new state is in control->mute + } +} diff --git a/src/AudiomixerService.h b/src/AudiomixerService.h new file mode 100644 index 0000000..3cd87c6 --- /dev/null +++ b/src/AudiomixerService.h @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _AUDIOMIXER_SERVICE_H +#define _AUDIOMIXER_SERVICE_H + +#include <glib.h> + +#include "KuksaConfig.h" +#include "KuksaClient.h" +#include "audiomixer.h" + +class AudiomixerService +{ +public: + AudiomixerService(const KuksaConfig &config, GMainLoop *loop = NULL); + + ~AudiomixerService(); + + // Callbacks for WirePlumber API + + static void mixer_control_change_cb(void *data) { + if (data) + ((AudiomixerService*) data)->HandleMixerControlChange(); + }; + + static void mixer_value_change_cb(void *data, + unsigned int change_mask, + const struct mixer_control *control) { + if (data) + ((AudiomixerService*) data)->HandleMixerValueChange(change_mask, control); + } + + // Callback for KuksaClient subscribe API reconnect + + static gboolean resubscribe_cb(gpointer data) { + struct resubscribe_data *d = (struct resubscribe_data*) data; + if (d && d->self) { + ((AudiomixerService*) d->self)->Resubscribe(d->request); + } + return FALSE; + } + +private: + struct resubscribe_data { + AudiomixerService *self; + const SubscribeRequest *request; + }; + + GMainLoop *m_loop; + KuksaConfig m_config; + KuksaClient *m_broker; + struct audiomixer *m_audiomixer; + struct audiomixer_events m_audiomixer_events; + + void HandleSignalChange(const std::string &path, const Datapoint &dp); + + void HandleSignalSetError(const std::string &path, const Error &error); + + void HandleSubscribeDone(const SubscribeRequest *request, const Status &status); + + void Resubscribe(const SubscribeRequest *request); + + void HandleMixerControlChange(void); + + void HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control); +}; + +#endif // _AUDIOMIXER_SERVICE_H diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp new file mode 100644 index 0000000..844ce0e --- /dev/null +++ b/src/KuksaClient.cpp @@ -0,0 +1,373 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <string> +#include <regex> +#include <iterator> +#include <mutex> + +#include "KuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) : + m_config(config) +{ + m_stub = VAL::NewStub(channel); +} + +void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleGetResponse(response, cb); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_string(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto entry = request->add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const std::map<std::string, bool> signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + for(auto it = signals.cbegin(); it != signals.cend(); ++it) { + auto entry = request->add_entries(); + entry->set_path(it->first); + entry->add_fields(Field::FIELD_PATH); + if (it->second) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + } + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + if (!(request && cb)) + return; + + class Reader : public grpc::ClientReadReactor<SubscribeResponse> { + public: + Reader(VAL::Stub *stub, + KuksaClient *client, + KuksaConfig &config, + const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb): + client_(client), + config_(config), + request_(request), + cb_(cb), + done_cb_(done_cb) { + std::string token = config_.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context_.AddMetadata(std::string("authorization"), token); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock<std::mutex> lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_, cb_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + std::cerr << "KuksaClient::subscribe::Reader done" << std::endl; + client_->handleSubscribeDone(request_, status_, done_cb_); + } + + // gRPC engine is done with us, safe to self-delete + delete request_; + delete this; + } + + private: + KuksaClient *client_; + KuksaConfig config_; + const SubscribeRequest *request_; + SubscribeResponseCallback cb_; + SubscribeDoneCallback done_cb_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; + }; + Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb); + if (!reader) + handleCriticalFailure("Could not create Subscribe reader"); +} + +// Private + +void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleSetResponse(response, cb); + delete response; + delete context; + }); +} + +void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb) +{ + if (!(response && response->entries_size() && cb)) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + cb(it->path(), dp); + } +} + +void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb) +{ + if (!(response && response->errors_size() && cb)) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + cb(it->path(), it->error()); + } + +} + +void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb) +{ + if (!(response && response->updates_size() && cb)) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + if (m_config.verbose()) + std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl; + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + cb(entry.path(), dp); + } +} + +void KuksaClient::handleSubscribeDone(const SubscribeRequest *request, + const Status &status, + SubscribeDoneCallback cb) +{ + if (cb) + cb(request, status); +} + +void KuksaClient::handleCriticalFailure(const std::string &error) +{ + if (error.size()) + std::cerr << error << std::endl; + exit(1); +} + diff --git a/src/KuksaClient.h b/src/KuksaClient.h new file mode 100644 index 0000000..fd9a9e0 --- /dev/null +++ b/src/KuksaClient.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KUKSA_CLIENT_H +#define KUKSA_CLIENT_H + +#include <string> +#include <map> +#include <grpcpp/grpcpp.h> +#include "kuksa/val/v1/val.grpc.pb.h" + +// Just pull in the whole namespace since Datapoint contains a lot of +// definitions that may potentially be needed. +using namespace kuksa::val::v1; + +using grpc::Status; + +#include "KuksaConfig.h" + +// API response callback types +typedef std::function<void(const std::string &path, const Datapoint &dp)> GetResponseCallback; +typedef std::function<void(const std::string &path, const Error &error)> SetResponseCallback; +typedef std::function<void(const std::string &path, const Datapoint &dp)> SubscribeResponseCallback; +typedef std::function<void(const SubscribeRequest *request, const Status &status)> SubscribeDoneCallback; + +// KUKSA.val databroker "VAL" gRPC API client class + +class KuksaClient +{ +public: + explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config); + + void get(const std::string &path, GetResponseCallback cb, const bool actuator = false); + + void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false); + + void subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator = false, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const std::map<std::string, bool> signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + +private: + KuksaConfig m_config; + std::shared_ptr<VAL::Stub> m_stub; + + void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator); + + void handleGetResponse(const GetResponse *response, GetResponseCallback cb); + + void handleSetResponse(const SetResponse *response, SetResponseCallback cb); + + void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb); + + void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb); + + void handleCriticalFailure(const std::string &error); + void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); }; +}; + +#endif // KUKSA_CLIENT_H diff --git a/src/vis-config.cpp b/src/KuksaConfig.cpp index a55e235..8fe09f6 100644 --- a/src/vis-config.cpp +++ b/src/KuksaConfig.cpp @@ -1,18 +1,21 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include "vis-config.hpp" #include <iostream> #include <iomanip> #include <sstream> +#include <exception> #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ini_parser.hpp> #include <boost/filesystem.hpp> +#include "KuksaConfig.h" namespace property_tree = boost::property_tree; namespace filesystem = boost::filesystem; -#define DEFAULT_CLIENT_KEY_FILE "/etc/kuksa-val/Client.key" -#define DEFAULT_CLIENT_CERT_FILE "/etc/kuksa-val/Client.pem" #define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem" inline @@ -21,32 +24,32 @@ void load_string_file(const filesystem::path& p, std::string& str) std::ifstream file; file.exceptions(std::ifstream::failbit | std::ifstream::badbit); file.open(p, std::ios_base::binary); - std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p)); - str.resize(sz, '\0'); - file.read(&str[0], sz); + if (file.good()) { + std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p)); + str.resize(sz, '\0'); + file.read(&str[0], sz); + } else { + str.clear(); + } } -VisConfig::VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer) : +KuksaConfig::KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken) : m_hostname(hostname), m_port(port), - m_clientKey(clientKey), - m_clientCert(clientCert), m_caCert(caCert), + m_tlsServerName(tlsServerName), m_authToken(authToken), - m_verifyPeer(verifyPeer), m_verbose(0), m_valid(true) { // Potentially could do some certificate validation here... } -VisConfig::VisConfig(const std::string &appname) : +KuksaConfig::KuksaConfig(const std::string &appname) : m_valid(false) { std::string config("/etc/xdg/AGL/"); @@ -70,7 +73,7 @@ VisConfig::VisConfig(const std::string &appname) : return; } const property_tree::ptree &settings = - pt.get_child("vis-client", property_tree::ptree()); + pt.get_child("kuksa-client", property_tree::ptree()); m_hostname = settings.get("server", "localhost"); std::stringstream ss; @@ -81,48 +84,12 @@ VisConfig::VisConfig(const std::string &appname) : return; } - m_port = settings.get("port", 8090); + m_port = settings.get("port", 55555); if (m_port == 0) { std::cerr << "Invalid server port" << std::endl; return; } - // Default to disabling peer verification for now to be able - // to use the default upstream KUKSA.val certificates for - // testing. Wrangling server and CA certificate generation - // and management to be able to verify will require further - // investigation. - m_verifyPeer = settings.get("verify-server", false); - - std::string keyFileName = settings.get("key", DEFAULT_CLIENT_KEY_FILE); - std::stringstream().swap(ss); - ss << keyFileName; - ss >> std::quoted(keyFileName); - ss.str(""); - if (keyFileName.empty()) { - std::cerr << "Invalid client key filename" << std::endl; - return; - } - load_string_file(keyFileName, m_clientKey); - if (m_clientKey.empty()) { - std::cerr << "Invalid client key file" << std::endl; - return; - } - - std::string certFileName = settings.get("certificate", DEFAULT_CLIENT_CERT_FILE); - std::stringstream().swap(ss); - ss << certFileName; - ss >> std::quoted(certFileName); - if (certFileName.empty()) { - std::cerr << "Invalid client certificate filename" << std::endl; - return; - } - load_string_file(certFileName, m_clientCert); - if (m_clientCert.empty()) { - std::cerr << "Invalid client certificate file" << std::endl; - return; - } - std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE); std::stringstream().swap(ss); ss << caCertFileName; @@ -131,12 +98,14 @@ VisConfig::VisConfig(const std::string &appname) : std::cerr << "Invalid CA certificate filename" << std::endl; return; } - load_string_file(caCertFileName, m_caCert); + readFile(caCertFileName, m_caCert); if (m_caCert.empty()) { std::cerr << "Invalid CA certificate file" << std::endl; return; } + m_tlsServerName = settings.get("tls-server-name", ""); + std::string authTokenFileName = settings.get("authorization", ""); std::stringstream().swap(ss); ss << authTokenFileName; @@ -145,7 +114,7 @@ VisConfig::VisConfig(const std::string &appname) : std::cerr << "Invalid authorization token filename" << std::endl; return; } - load_string_file(authTokenFileName, m_authToken); + readFile(authTokenFileName, m_authToken); if (m_authToken.empty()) { std::cerr << "Invalid authorization token file" << std::endl; return; @@ -165,3 +134,14 @@ VisConfig::VisConfig(const std::string &appname) : m_valid = true; } + +// Private + +void KuksaConfig::readFile(const std::string &filename, std::string &data) +{ + try { + load_string_file(filename, data); + } catch (const std::exception &e) { + data.clear(); + } +} diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h new file mode 100644 index 0000000..e70385f --- /dev/null +++ b/src/KuksaConfig.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _KUKSA_CONFIG_H +#define _KUKSA_CONFIG_H + +#include <string> + +class KuksaConfig +{ +public: + explicit KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken); + explicit KuksaConfig(const std::string &appname); + ~KuksaConfig() {}; + + std::string hostname() { return m_hostname; }; + unsigned port() { return m_port; }; + std::string caCert() { return m_caCert; }; + std::string tlsServerName() { return m_tlsServerName; }; + std::string authToken() { return m_authToken; }; + bool valid() { return m_valid; }; + unsigned verbose() { return m_verbose; }; + +private: + std::string m_hostname; + unsigned m_port; + std::string m_caCert; + std::string m_tlsServerName; + std::string m_authToken; + unsigned m_verbose; + bool m_valid; + + void readFile(const std::string &filename, std::string &data); +}; + +#endif // _KUKSA_CONFIG_H diff --git a/src/audiomixer-service.cpp b/src/audiomixer-service.cpp deleted file mode 100644 index 5787153..0000000 --- a/src/audiomixer-service.cpp +++ /dev/null @@ -1,129 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "audiomixer-service.hpp" -#include <iostream> -#include <algorithm> - - -AudiomixerService::AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - VisSession(config, ioc, ctx) -{ - m_audiomixer = audiomixer_new(); - if (m_audiomixer) { - // Set up callbacks for WirePlumber events - m_audiomixer_events.controls_changed = audiomixer_control_change_cb; - m_audiomixer_events.value_changed = audiomixer_value_change_cb; - audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this); - - // Drive connecting to PipeWire core and refreshing controls list - audiomixer_lock(m_audiomixer); - audiomixer_ensure_controls(m_audiomixer, 3); - audiomixer_unlock(m_audiomixer); - } else { - std::cerr << "Could not create WirePlumber connection" << std::endl; - } -} - -AudiomixerService::~AudiomixerService() -{ - audiomixer_free(m_audiomixer); -} - -void AudiomixerService::handle_authorized_response(void) -{ - subscribe("Vehicle.Cabin.Infotainment.Media.Volume"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"); - subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"); - - // Set initial volume in VSS - // For now a value of 50 matches the default in the homescreen app. - // Ideally there would be some form of persistence scheme to restore - // the last value on restart. - set("Vehicle.Cabin.Infotainment.Media.Volume", "50"); -} - -void AudiomixerService::handle_get_response(std::string &path, std::string &value, std::string ×tamp) -{ - // Placeholder since no gets are performed ATM -} - -void AudiomixerService::handle_notification(std::string &path, std::string &value, std::string ×tamp) -{ - if (!m_audiomixer) { - return; - } - - audiomixer_lock(m_audiomixer); - - const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback"); - if (!ctl) { - audiomixer_unlock(m_audiomixer); - return; - } - - if (path == "Vehicle.Cabin.Infotainment.Media.Volume") { - try { - int volume = std::stoi(value); - if (volume >= 0 && volume <= 100) { - double v = (double) volume / 100.0; - if (m_config.verbose() > 1) - std::cout << "Setting volume to " << v << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, v); - } - } - catch (std::exception ex) { - // ignore bad value - } - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp" && value == "true") { - double volume = ctl->volume; - volume += 0.05; // up 5% - if (volume > 1.0) - volume = 1.0; // clamp to 100% - if (m_config.verbose() > 1) - std::cout << "Increasing volume to " << volume << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, volume); - - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown" && value == "true") { - double volume = ctl->volume; - volume -= 0.05; // down 5% - if (volume < 0.0) - volume = 0.0; // clamp to 0% - if (m_config.verbose() > 1) - std::cout << "Decreasing volume to " << volume << std::endl; - audiomixer_change_volume(m_audiomixer, ctl, volume); - - } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute" && value == "true") { - if (m_config.verbose() > 1) { - if (ctl->mute) - std::cout << "Unmuting" << std::endl; - else - std::cout << "Muting" << std::endl; - } - audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute); - } - // else ignore - - audiomixer_unlock(m_audiomixer); -} - -void AudiomixerService::handle_control_change(void) -{ - // Ignore for now -} - -void AudiomixerService::handle_value_change(unsigned int change_mask, const struct mixer_control *control) -{ - if (!control) - return; - - if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) { - if (std::string(control->name) == "Master Playback") { - // Push change into VIS - std::string value = std::to_string((int) (control->volume * 100.0)); - set("Vehicle.Cabin.Infotainment.Media.Volume", value); - } - } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) { - // For now, do nothing, new state is in control->mute - } -} diff --git a/src/audiomixer-service.hpp b/src/audiomixer-service.hpp deleted file mode 100644 index cb00584..0000000 --- a/src/audiomixer-service.hpp +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _AUDIOMIXER_SERVICE_HPP -#define _AUDIOMIXER_SERVICE_HPP - -#include "vis-session.hpp" -#include "audiomixer.h" - -class AudiomixerService : public VisSession -{ - struct audiomixer *m_audiomixer; - -public: - AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - - ~AudiomixerService(); - - static void audiomixer_control_change_cb(void *data) { - if (data) - ((AudiomixerService*) data)->handle_control_change(); - }; - - static void audiomixer_value_change_cb(void *data, - unsigned int change_mask, - const struct mixer_control *control) { - if (data) - ((AudiomixerService*) data)->handle_value_change(change_mask, control); - } - -protected: - struct audiomixer_events m_audiomixer_events; - - virtual void handle_authorized_response(void) override; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) override; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) override; - - virtual void handle_control_change(void); - - virtual void handle_value_change(unsigned int change_mask, const struct mixer_control *control); -}; - -#endif // _AUDIOMIXER_SERVICE_HPP diff --git a/src/main.cpp b/src/main.cpp index 0960f1b..6776472 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,34 +1,54 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include <iostream> -#include <iomanip> -#include <boost/asio/signal_set.hpp> -#include <boost/bind.hpp> -#include "audiomixer-service.hpp" +//#include <unistd.h> +//#include <signal.h> +#include <glib.h> +#include <glib-unix.h> +#include <systemd/sd-daemon.h> -using work_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>; +#include "AudiomixerService.h" + +static gboolean quit_cb(gpointer user_data) +{ + GMainLoop *loop = (GMainLoop*) user_data; + + g_info("Quitting..."); + + if (loop) + g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop); + else + exit(0); + + return G_SOURCE_REMOVE; +} int main(int argc, char** argv) { - // The io_context is required for all I/O - net::io_context ioc; + GMainLoop *loop = NULL; + + loop = g_main_loop_new(NULL, FALSE); + if (!loop) { + std::cerr << "Could not create GLib event loop" << std::endl; + exit(1); + } + + KuksaConfig config("agl-service-audiomixer"); - // Register to stop I/O context on SIGINT and SIGTERM - net::signal_set signals(ioc, SIGINT, SIGTERM); - signals.async_wait(boost::bind(&net::io_context::stop, &ioc)); + g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop); + g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop); - // The SSL context is required, and holds certificates - ssl::context ctx{ssl::context::tlsv12_client}; + AudiomixerService service(config, loop); - // Launch the asynchronous operation - VisConfig config("agl-service-audiomixer"); - std::make_shared<AudiomixerService>(config, ioc, ctx)->run(); + sd_notify(0, "READY=1"); - // Ensure I/O context continues running even if there's no work - work_guard_type work_guard(ioc.get_executor()); + g_main_loop_run(loop); - // Run the I/O context - ioc.run(); + // Clean up + g_main_loop_unref(loop); return 0; } diff --git a/src/meson.build b/src/meson.build index b1603fc..6c50419 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,19 +1,60 @@ boost_dep = dependency('boost', version : '>=1.72', modules : [ 'thread', 'filesystem', 'program_options', 'log', 'system' ]) -openssl_dep = dependency('openssl') -thread_dep = dependency('threads') -wp_dep = dependency('wireplumber-0.4') -src = [ 'vis-config.cpp', - 'vis-session.cpp', - 'audiomixer-service.cpp', - 'audiomixer.c', - 'main.cpp' +cpp = meson.get_compiler('cpp') +grpcpp_reflection_dep = cpp.find_library('grpc++_reflection') + +service_dep = [ + boost_dep, + dependency('openssl'), + dependency('threads'), + dependency('libsystemd'), + dependency('wireplumber-0.4'), + dependency('protobuf'), + dependency('grpc'), + dependency('grpc++'), + grpcpp_reflection_dep ] + +protoc = find_program('protoc') +grpc_cpp = find_program('grpc_cpp_plugin') + +protos_base_dir = get_option('protos') +protos_dir = protos_base_dir / 'kuksa/val/v1' +protoc_gen = generator(protoc, \ + output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--cpp_out=@BUILD_DIR@', + '@INPUT@']) +generated_protoc_sources = [ \ + protoc_gen.process(protos_dir / 'types.proto', preserve_path_from : protos_base_dir), + protoc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +grpc_gen = generator(protoc, \ + output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--grpc_out=@BUILD_DIR@', + '--plugin=protoc-gen-grpc=' + grpc_cpp.path(), + '@INPUT@']) +generated_grpc_sources = [ \ + grpc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +src = [ + 'KuksaConfig.cpp', + 'KuksaClient.cpp', + 'AudiomixerService.cpp', + 'audiomixer.c', + 'main.cpp', + generated_protoc_sources, + generated_grpc_sources, +] + executable('agl-service-audiomixer', src, - dependencies: [boost_dep, openssl_dep, thread_dep, systemd_dep, wp_dep], + dependencies: service_dep, install: true, c_args : [ '-D_XOPEN_SOURCE=700', diff --git a/src/vis-config.hpp b/src/vis-config.hpp deleted file mode 100644 index b0f72f9..0000000 --- a/src/vis-config.hpp +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_CONFIG_HPP -#define _VIS_CONFIG_HPP - -#include <string> - -class VisConfig -{ -public: - explicit VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer = true); - explicit VisConfig(const std::string &appname); - ~VisConfig() {}; - - std::string hostname() { return m_hostname; }; - unsigned port() { return m_port; }; - std::string clientKey() { return m_clientKey; }; - std::string clientCert() { return m_clientCert; }; - std::string caCert() { return m_caCert; }; - std::string authToken() { return m_authToken; }; - bool verifyPeer() { return m_verifyPeer; }; - bool valid() { return m_valid; }; - unsigned verbose() { return m_verbose; }; - -private: - std::string m_hostname; - unsigned m_port; - std::string m_clientKey; - std::string m_clientCert; - std::string m_caCert; - std::string m_authToken; - bool m_verifyPeer; - unsigned m_verbose; - bool m_valid; -}; - -#endif // _VIS_CONFIG_HPP diff --git a/src/vis-session.cpp b/src/vis-session.cpp deleted file mode 100644 index 53a554c..0000000 --- a/src/vis-session.cpp +++ /dev/null @@ -1,380 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "vis-session.hpp" -#include <iostream> -#include <sstream> -#include <thread> - - -// Logging helper -static void log_error(beast::error_code error, char const* what) -{ - std::cerr << what << " error: " << error.message() << std::endl; -} - - -// Resolver and socket require an io_context -VisSession::VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - m_config(config), - m_resolver(net::make_strand(ioc)), - m_ws(net::make_strand(ioc), ctx) -{ -} - -// Start the asynchronous operation -void VisSession::run() -{ - if (!m_config.valid()) { - return; - } - - // Start by resolving hostname - m_resolver.async_resolve(m_config.hostname(), - std::to_string(m_config.port()), - beast::bind_front_handler(&VisSession::on_resolve, - shared_from_this())); -} - -void VisSession::on_resolve(beast::error_code error, - tcp::resolver::results_type results) -{ - if(error) { - log_error(error, "resolve"); - return; - } - - // Set a timeout on the connect operation - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Connect to resolved address - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - m_results = results; - connect(); -} - -void VisSession::connect() -{ - beast::get_lowest_layer(m_ws).async_connect(m_results, - beast::bind_front_handler(&VisSession::on_connect, - shared_from_this())); -} - -void VisSession::on_connect(beast::error_code error, - tcp::resolver::results_type::endpoint_type endpoint) -{ - if(error) { - // The server can take a while to be ready to accept connections, - // so keep retrying until we hit the timeout. - if (error == net::error::timed_out) { - log_error(error, "connect"); - return; - } - - // Delay 500 ms before retrying - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - - connect(); - return; - } - - if (m_config.verbose()) - std::cout << "Connected" << std::endl; - - // Set handshake timeout - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Set SNI Hostname (many hosts need this to handshake successfully) - if(!SSL_set_tlsext_host_name(m_ws.next_layer().native_handle(), - m_config.hostname().c_str())) - { - error = beast::error_code(static_cast<int>(::ERR_get_error()), - net::error::get_ssl_category()); - log_error(error, "connect"); - return; - } - - // Update the hostname. This will provide the value of the - // Host HTTP header during the WebSocket handshake. - // See https://tools.ietf.org/html/rfc7230#section-5.4 - m_hostname = m_config.hostname() + ':' + std::to_string(endpoint.port()); - - if (m_config.verbose()) - std::cout << "Negotiating SSL handshake" << std::endl; - - // Perform the SSL handshake - m_ws.next_layer().async_handshake(ssl::stream_base::client, - beast::bind_front_handler(&VisSession::on_ssl_handshake, - shared_from_this())); -} - -void VisSession::on_ssl_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "SSL handshake"); - return; - } - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(m_ws).expires_never(); - - // NOTE: Explicitly not setting websocket stream timeout here, - // as the client is long-running. - - if (m_config.verbose()) - std::cout << "Negotiating WSS handshake" << std::endl; - - // Perform handshake - m_ws.async_handshake(m_hostname, - "/", - beast::bind_front_handler(&VisSession::on_handshake, - shared_from_this())); -} - -void VisSession::on_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "WSS handshake"); - return; - } - - if (m_config.verbose()) - std::cout << "Authorizing" << std::endl; - - // Authorize - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"]= "authorize"; - req["tokens"] = m_config.authToken(); - - m_ws.async_write(net::buffer(req.dump(4)), - beast::bind_front_handler(&VisSession::on_authorize, - shared_from_this())); -} - -void VisSession::on_authorize(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "authorize"); - return; - } - - // Read response - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -// NOTE: Placeholder for now -void VisSession::on_write(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "write"); - return; - } - - // Do nothing... -} - -void VisSession::on_read(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "read"); - return; - } - - // Handle message - std::string s = beast::buffers_to_string(m_buffer.data()); - json response = json::parse(s, nullptr, false); - if (!response.is_discarded()) { - handle_message(response); - } else { - std::cerr << "json::parse failed? got " << s << std::endl; - } - m_buffer.consume(m_buffer.size()); - - // Read next message - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -void VisSession::get(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "get"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::set(const std::string &path, const std::string &value) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "set"; - req["path"] = path; - req["value"] = value; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::subscribe(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "subscribe"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -bool VisSession::parseData(const json &message, std::string &path, std::string &value, std::string ×tamp) -{ - if (message.contains("error")) { - std::string error = message["error"]; - return false; - } - - if (!(message.contains("data") && message["data"].is_object())) { - std::cerr << "Malformed message (data missing)" << std::endl; - return false; - } - auto data = message["data"]; - if (!(data.contains("path") && data["path"].is_string())) { - std::cerr << "Malformed message (path missing)" << std::endl; - return false; - } - path = data["path"]; - // Convert '/' to '.' in paths to ensure consistency for clients - std::replace(path.begin(), path.end(), '/', '.'); - - if (!(data.contains("dp") && data["dp"].is_object())) { - std::cerr << "Malformed message (datapoint missing)" << std::endl; - return false; - } - auto dp = data["dp"]; - if (!dp.contains("value")) { - std::cerr << "Malformed message (value missing)" << std::endl; - return false; - } else if (dp["value"].is_string()) { - value = dp["value"]; - } else if (dp["value"].is_number_float()) { - double num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_unsigned()) { - unsigned int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_integer()) { - int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_boolean()) { - value = dp["value"] ? "true" : "false"; - } else { - std::cerr << "Malformed message (unsupported value type)" << std::endl; - return false; - } - - if (!(dp.contains("ts") && dp["ts"].is_string())) { - std::cerr << "Malformed message (timestamp missing)" << std::endl; - return false; - } - timestamp = dp["ts"]; - - return true; -} - -void VisSession::handle_message(const json &message) -{ - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: enter, message = " << to_string(message) << std::endl; - - if (!message.contains("action")) { - std::cerr << "Received unknown message (no action), discarding" << std::endl; - return; - } - - std::string action = message["action"]; - if (action == "authorize") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS authorization failed: " << error << std::endl; - } else { - if (m_config.verbose() > 1) - std::cout << "authorized" << std::endl; - - handle_authorized_response(); - } - } else if (action == "subscribe") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS subscription failed: " << error << std::endl; - } - } else if (action == "get") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS get failed: " << error << std::endl; - } else { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got response " << path << " = " << value << std::endl; - - handle_get_response(path, value, ts); - } - } - } else if (action == "set") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS set failed: " << error; - } - } else if (action == "subscription") { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got notification " << path << " = " << value << std::endl; - - handle_notification(path, value, ts); - } - } else { - std::cerr << "unhandled VIS response of type: " << action; - } - - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: exit" << std::endl; -} - diff --git a/src/vis-session.hpp b/src/vis-session.hpp deleted file mode 100644 index 8c7b0d9..0000000 --- a/src/vis-session.hpp +++ /dev/null @@ -1,78 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_SESSION_HPP -#define _VIS_SESSION_HPP - -#include "vis-config.hpp" -#include <atomic> -#include <string> -#include <boost/beast/core.hpp> -#include <boost/beast/ssl.hpp> -#include <boost/beast/websocket.hpp> -#include <boost/beast/websocket/ssl.hpp> -#include <boost/asio/strand.hpp> -#include <nlohmann/json.hpp> - -namespace beast = boost::beast; -namespace websocket = beast::websocket; -namespace net = boost::asio; -namespace ssl = boost::asio::ssl; -using tcp = boost::asio::ip::tcp; -using json = nlohmann::json; - - -class VisSession : public std::enable_shared_from_this<VisSession> -{ - //net::io_context m_ioc; - tcp::resolver m_resolver; - tcp::resolver::results_type m_results; - std::string m_hostname; - websocket::stream<beast::ssl_stream<beast::tcp_stream>> m_ws; - beast::flat_buffer m_buffer; - -public: - // Resolver and socket require an io_context - explicit VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - - // Start the asynchronous operation - void run(); - -protected: - VisConfig m_config; - std::atomic_uint m_requestid; - - void on_resolve(beast::error_code error, tcp::resolver::results_type results); - - void connect(); - - void on_connect(beast::error_code error, tcp::resolver::results_type::endpoint_type endpoint); - - void on_ssl_handshake(beast::error_code error); - - void on_handshake(beast::error_code error); - - void on_authorize(beast::error_code error, std::size_t bytes_transferred); - - void on_write(beast::error_code error, std::size_t bytes_transferred); - - void on_read(beast::error_code error, std::size_t bytes_transferred); - - void get(const std::string &path); - - void set(const std::string &path, const std::string &value); - - void subscribe(const std::string &path); - - void handle_message(const json &message); - - bool parseData(const json &message, std::string &path, std::string &value, std::string ×tamp); - - virtual void handle_authorized_response(void) = 0; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) = 0; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) = 0; - -}; - -#endif // _VIS_SESSION_HPP |