aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/AudiomixerService.cpp278
-rw-r--r--src/AudiomixerService.h72
-rw-r--r--src/KuksaClient.cpp373
-rw-r--r--src/KuksaClient.h79
-rw-r--r--src/KuksaConfig.cpp (renamed from src/vis-config.cpp)96
-rw-r--r--src/KuksaConfig.h43
-rw-r--r--src/audiomixer-service.cpp129
-rw-r--r--src/audiomixer-service.hpp44
-rw-r--r--src/main.cpp62
-rw-r--r--src/meson.build59
-rw-r--r--src/vis-config.hpp43
-rw-r--r--src/vis-session.cpp380
-rw-r--r--src/vis-session.hpp78
13 files changed, 974 insertions, 762 deletions
diff --git a/src/AudiomixerService.cpp b/src/AudiomixerService.cpp
new file mode 100644
index 0000000..f2445b8
--- /dev/null
+++ b/src/AudiomixerService.cpp
@@ -0,0 +1,278 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <sstream>
+#include <iostream>
+#include <algorithm>
+#include <thread>
+#include <chrono>
+
+#include "AudiomixerService.h"
+
+AudiomixerService::AudiomixerService(const KuksaConfig &config, GMainLoop *loop) :
+ m_loop(loop),
+ m_config(config)
+{
+ m_audiomixer = audiomixer_new();
+ if (m_audiomixer) {
+ // Set up callbacks for WirePlumber events
+ m_audiomixer_events.controls_changed = mixer_control_change_cb;
+ m_audiomixer_events.value_changed = mixer_value_change_cb;
+ audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this);
+
+ // Drive connecting to PipeWire core and refreshing controls list
+ audiomixer_lock(m_audiomixer);
+ audiomixer_ensure_controls(m_audiomixer, 3);
+ audiomixer_unlock(m_audiomixer);
+ } else {
+ std::cerr << "Could not create WirePlumber connection" << std::endl;
+ }
+
+ // Set initial volume
+ if (m_audiomixer) {
+ audiomixer_lock(m_audiomixer);
+ const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback");
+ if (ctl) {
+ audiomixer_change_volume(m_audiomixer, ctl, 0.5);
+ }
+ audiomixer_unlock(m_audiomixer);
+ }
+
+ // Create gRPC channel
+ std::string host = m_config.hostname();
+ host += ":";
+ std::stringstream ss;
+ ss << m_config.port();
+ host += ss.str();
+
+ std::shared_ptr<grpc::Channel> channel;
+ if (!m_config.caCert().empty()) {
+ grpc::SslCredentialsOptions options;
+ options.pem_root_certs = m_config.caCert();
+ if (!m_config.tlsServerName().empty()) {
+ grpc::ChannelArguments args;
+ auto target = m_config.tlsServerName();
+ std::cout << "Overriding TLS target name with " << target << std::endl;
+ args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target);
+ channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args);
+ } else {
+ channel = grpc::CreateChannel(host, grpc::SslCredentials(options));
+ }
+ } else {
+ channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials());
+ }
+
+ // Wait for the channel to be ready
+ std::cout << "Waiting for Databroker gRPC channel" << std::endl;
+ while (!channel->WaitForConnected(std::chrono::system_clock::now() +
+ std::chrono::milliseconds(500))) ;
+ std::cout << "Databroker gRPC channel ready" << std::endl;
+
+ m_broker = new KuksaClient(channel, m_config);
+ if (m_broker) {
+ // Listen to actuator target updates for the volume signal and
+ // values for the steering wheel volume switch signals.
+ std::map<std::string, bool> signals;
+ signals["Vehicle.Cabin.Infotainment.Media.Volume"] = true;
+ signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"] = false;
+ signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"] = false;
+ signals["Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"] = false;
+ m_broker->subscribe(signals,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+
+ // Set initial volume in VSS
+ // Push out the default value of 50 which matches the default in the
+ // homescreen app. Ideally there would be some form of persistence
+ // scheme to restore the last value on restart.
+ m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume",
+ 50U,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+ } else {
+ std::cerr << "Could not create Kuksa API client" << std::endl;
+ }
+}
+
+AudiomixerService::~AudiomixerService()
+{
+ delete m_broker;
+
+ audiomixer_free(m_audiomixer);
+}
+
+// Private
+
+void AudiomixerService::HandleSignalChange(const std::string &path, const Datapoint &dp)
+{
+ if (m_config.verbose() > 1)
+ std::cout << "AudiomixerService::HandleSignalChange: Value received for " << path << std::endl;
+
+ if (!m_audiomixer)
+ return;
+
+ audiomixer_lock(m_audiomixer);
+
+ const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback");
+ if (!ctl) {
+ audiomixer_unlock(m_audiomixer);
+ return;
+ }
+ if (path == "Vehicle.Cabin.Infotainment.Media.Volume") {
+ if (dp.has_uint32()) {
+ uint32_t volume = dp.uint32();
+ if (volume >= 0 && volume <= 100) {
+ double v = (double) volume / 100.0;
+ if (m_config.verbose() > 1)
+ std::cout << "Setting volume to " << v << std::endl;
+ audiomixer_change_volume(m_audiomixer, ctl, v);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume",
+ volume,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+ }
+ }
+ } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp") {
+ if (dp.has_bool_() && dp.bool_()) {
+ double volume = ctl->volume;
+ volume += 0.05; // up 5%
+ if (volume > 1.0)
+ volume = 1.0; // clamp to 100%
+ if (m_config.verbose() > 1)
+ std::cout << "Increasing volume to " << volume << std::endl;
+ audiomixer_change_volume(m_audiomixer, ctl, volume);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume",
+ (unsigned) (volume * 100),
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+ }
+ } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown") {
+ if (dp.has_bool_() && dp.bool_()) {
+ double volume = ctl->volume;
+ volume -= 0.05; // down 5%
+ if (volume < 0.0)
+ volume = 0.0; // clamp to 0%
+ if (m_config.verbose() > 1)
+ std::cout << "Decreasing volume to " << volume << std::endl;
+ audiomixer_change_volume(m_audiomixer, ctl, volume);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume",
+ (unsigned) (volume * 100),
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+ }
+ } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute") {
+ if (dp.has_bool_() && dp.bool_()) {
+ if (m_config.verbose() > 1) {
+ if (ctl->mute)
+ std::cout << "Unmuting" << std::endl;
+ else
+ std::cout << "Muting" << std::endl;
+ }
+ audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute);
+ }
+ }
+ // else ignore
+
+ audiomixer_unlock(m_audiomixer);
+}
+
+void AudiomixerService::HandleSignalSetError(const std::string &path, const Error &error)
+{
+ std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl;
+}
+
+void AudiomixerService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status)
+{
+ if (m_config.verbose())
+ std::cout << "Subscribe status = " << status.error_code() <<
+ " (" << status.error_message() << ")" << std::endl;
+
+ if (status.error_code() == grpc::CANCELLED) {
+ if (m_config.verbose())
+ std::cerr << "Subscribe canceled, assuming shutdown" << std::endl;
+ return;
+ }
+
+ // Queue up a resubcribe via the GLib event loop callback
+ struct resubscribe_data *data = new (struct resubscribe_data);
+ if (!data) {
+ std::cerr << "Could not create resubcribe_data" << std::endl;
+ exit(1);
+ }
+ data->self = this;
+ // Need to copy request since the one we have been handed is from the
+ // finished subscribe and will be going away.
+ data->request = new SubscribeRequest(*request);
+ if (!data->request) {
+ std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl;
+ exit(1);
+ }
+
+ // NOTE: Waiting 100 milliseconds for now; it is possible that some
+ // randomization and/or back-off may need to be added if many
+ // subscribes are active, or switching to some other resubscribe
+ // scheme altogether (e.g. post subscribes to a thread that waits
+ // for the channel to become connected again).
+ g_timeout_add_full(G_PRIORITY_DEFAULT,
+ 100,
+ resubscribe_cb,
+ data,
+ NULL);
+}
+
+void AudiomixerService::Resubscribe(const SubscribeRequest *request)
+{
+ if (!(m_broker && request))
+ return;
+
+ m_broker->subscribe(request,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+}
+
+void AudiomixerService::HandleMixerControlChange(void)
+{
+ // Ignore for now
+}
+
+void AudiomixerService::HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control)
+{
+ if (!control)
+ return;
+
+ if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) {
+ if (std::string(control->name) == "Master Playback") {
+ // Push out new value
+ unsigned value = control->volume * 100;
+ m_broker->set("Vehicle.Cabin.Infotainment.Media.Volume",
+ value,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+ }
+ } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) {
+ // For now, do nothing, new state is in control->mute
+ }
+}
diff --git a/src/AudiomixerService.h b/src/AudiomixerService.h
new file mode 100644
index 0000000..3cd87c6
--- /dev/null
+++ b/src/AudiomixerService.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _AUDIOMIXER_SERVICE_H
+#define _AUDIOMIXER_SERVICE_H
+
+#include <glib.h>
+
+#include "KuksaConfig.h"
+#include "KuksaClient.h"
+#include "audiomixer.h"
+
+class AudiomixerService
+{
+public:
+ AudiomixerService(const KuksaConfig &config, GMainLoop *loop = NULL);
+
+ ~AudiomixerService();
+
+ // Callbacks for WirePlumber API
+
+ static void mixer_control_change_cb(void *data) {
+ if (data)
+ ((AudiomixerService*) data)->HandleMixerControlChange();
+ };
+
+ static void mixer_value_change_cb(void *data,
+ unsigned int change_mask,
+ const struct mixer_control *control) {
+ if (data)
+ ((AudiomixerService*) data)->HandleMixerValueChange(change_mask, control);
+ }
+
+ // Callback for KuksaClient subscribe API reconnect
+
+ static gboolean resubscribe_cb(gpointer data) {
+ struct resubscribe_data *d = (struct resubscribe_data*) data;
+ if (d && d->self) {
+ ((AudiomixerService*) d->self)->Resubscribe(d->request);
+ }
+ return FALSE;
+ }
+
+private:
+ struct resubscribe_data {
+ AudiomixerService *self;
+ const SubscribeRequest *request;
+ };
+
+ GMainLoop *m_loop;
+ KuksaConfig m_config;
+ KuksaClient *m_broker;
+ struct audiomixer *m_audiomixer;
+ struct audiomixer_events m_audiomixer_events;
+
+ void HandleSignalChange(const std::string &path, const Datapoint &dp);
+
+ void HandleSignalSetError(const std::string &path, const Error &error);
+
+ void HandleSubscribeDone(const SubscribeRequest *request, const Status &status);
+
+ void Resubscribe(const SubscribeRequest *request);
+
+ void HandleMixerControlChange(void);
+
+ void HandleMixerValueChange(unsigned int change_mask, const struct mixer_control *control);
+};
+
+#endif // _AUDIOMIXER_SERVICE_H
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp
new file mode 100644
index 0000000..844ce0e
--- /dev/null
+++ b/src/KuksaClient.cpp
@@ -0,0 +1,373 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <regex>
+#include <iterator>
+#include <mutex>
+
+#include "KuksaClient.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::ClientReader;
+using grpc::Status;
+
+KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) :
+ m_config(config)
+{
+ m_stub = VAL::NewStub(channel);
+}
+
+void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ GetRequest request;
+ auto entry = request.add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ GetResponse *response = new GetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create GetResponse");
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Get(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleGetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+// Since a set request needs a Datapoint with the appropriate type value,
+// checking the signal metadata to get the type would be a requirement for
+// a generic set call that takes a string as argument. For now, assume
+// that set with a string is specifically for a signal of string type.
+
+void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_string(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_float_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_double_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ auto entry = request->add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ for(auto it = signals.cbegin(); it != signals.cend(); ++it) {
+ auto entry = request->add_entries();
+ entry->set_path(it->first);
+ entry->add_fields(Field::FIELD_PATH);
+ if (it->second)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+ }
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ if (!(request && cb))
+ return;
+
+ class Reader : public grpc::ClientReadReactor<SubscribeResponse> {
+ public:
+ Reader(VAL::Stub *stub,
+ KuksaClient *client,
+ KuksaConfig &config,
+ const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb):
+ client_(client),
+ config_(config),
+ request_(request),
+ cb_(cb),
+ done_cb_(done_cb) {
+ std::string token = config_.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context_.AddMetadata(std::string("authorization"), token);
+ }
+ stub->async()->Subscribe(&context_, request, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (ok) {
+ if (client_)
+ client_->handleSubscribeResponse(&response_, cb_);
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ status_ = s;
+ if (client_) {
+ if (config_.verbose() > 1)
+ std::cerr << "KuksaClient::subscribe::Reader done" << std::endl;
+ client_->handleSubscribeDone(request_, status_, done_cb_);
+ }
+
+ // gRPC engine is done with us, safe to self-delete
+ delete request_;
+ delete this;
+ }
+
+ private:
+ KuksaClient *client_;
+ KuksaConfig config_;
+ const SubscribeRequest *request_;
+ SubscribeResponseCallback cb_;
+ SubscribeDoneCallback done_cb_;
+
+ ClientContext context_;
+ SubscribeResponse response_;
+ std::mutex mutex_;
+ Status status_;
+ };
+ Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb);
+ if (!reader)
+ handleCriticalFailure("Could not create Subscribe reader");
+}
+
+// Private
+
+void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ SetRequest request;
+ auto update = request.add_updates();
+ auto entry = update->mutable_entry();
+ entry->set_path(path);
+ if (actuator) {
+ auto target = entry->mutable_actuator_target();
+ *target = dp;
+ update->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ } else {
+ auto value = entry->mutable_value();
+ *value = dp;
+ update->add_fields(Field::FIELD_VALUE);
+ }
+
+ SetResponse *response = new SetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create SetResponse");
+ delete context;
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Set(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleSetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb)
+{
+ if (!(response && response->entries_size() && cb))
+ return;
+
+ for (auto it = response->entries().begin(); it != response->entries().end(); ++it) {
+ // We expect paths in the response entries
+ if (!it->path().size())
+ continue;
+
+ Datapoint dp;
+ if (it->has_actuator_target())
+ dp = it->actuator_target();
+ else
+ dp = it->value();
+ cb(it->path(), dp);
+ }
+}
+
+void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb)
+{
+ if (!(response && response->errors_size() && cb))
+ return;
+
+ for (auto it = response->errors().begin(); it != response->errors().end(); ++it) {
+ cb(it->path(), it->error());
+ }
+
+}
+
+void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb)
+{
+ if (!(response && response->updates_size() && cb))
+ return;
+
+ for (auto it = response->updates().begin(); it != response->updates().end(); ++it) {
+ // We expect entries that have paths in the response
+ if (!(it->has_entry() && it->entry().path().size()))
+ continue;
+
+ auto entry = it->entry();
+ if (m_config.verbose())
+ std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl;
+
+ Datapoint dp;
+ if (entry.has_actuator_target())
+ dp = entry.actuator_target();
+ else
+ dp = entry.value();
+
+ cb(entry.path(), dp);
+ }
+}
+
+void KuksaClient::handleSubscribeDone(const SubscribeRequest *request,
+ const Status &status,
+ SubscribeDoneCallback cb)
+{
+ if (cb)
+ cb(request, status);
+}
+
+void KuksaClient::handleCriticalFailure(const std::string &error)
+{
+ if (error.size())
+ std::cerr << error << std::endl;
+ exit(1);
+}
+
diff --git a/src/KuksaClient.h b/src/KuksaClient.h
new file mode 100644
index 0000000..fd9a9e0
--- /dev/null
+++ b/src/KuksaClient.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef KUKSA_CLIENT_H
+#define KUKSA_CLIENT_H
+
+#include <string>
+#include <map>
+#include <grpcpp/grpcpp.h>
+#include "kuksa/val/v1/val.grpc.pb.h"
+
+// Just pull in the whole namespace since Datapoint contains a lot of
+// definitions that may potentially be needed.
+using namespace kuksa::val::v1;
+
+using grpc::Status;
+
+#include "KuksaConfig.h"
+
+// API response callback types
+typedef std::function<void(const std::string &path, const Datapoint &dp)> GetResponseCallback;
+typedef std::function<void(const std::string &path, const Error &error)> SetResponseCallback;
+typedef std::function<void(const std::string &path, const Datapoint &dp)> SubscribeResponseCallback;
+typedef std::function<void(const SubscribeRequest *request, const Status &status)> SubscribeDoneCallback;
+
+// KUKSA.val databroker "VAL" gRPC API client class
+
+class KuksaClient
+{
+public:
+ explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config);
+
+ void get(const std::string &path, GetResponseCallback cb, const bool actuator = false);
+
+ void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false);
+
+ void subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator = false,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+
+private:
+ KuksaConfig m_config;
+ std::shared_ptr<VAL::Stub> m_stub;
+
+ void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator);
+
+ void handleGetResponse(const GetResponse *response, GetResponseCallback cb);
+
+ void handleSetResponse(const SetResponse *response, SetResponseCallback cb);
+
+ void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb);
+
+ void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb);
+
+ void handleCriticalFailure(const std::string &error);
+ void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); };
+};
+
+#endif // KUKSA_CLIENT_H
diff --git a/src/vis-config.cpp b/src/KuksaConfig.cpp
index a55e235..8fe09f6 100644
--- a/src/vis-config.cpp
+++ b/src/KuksaConfig.cpp
@@ -1,18 +1,21 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include "vis-config.hpp"
#include <iostream>
#include <iomanip>
#include <sstream>
+#include <exception>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/filesystem.hpp>
+#include "KuksaConfig.h"
namespace property_tree = boost::property_tree;
namespace filesystem = boost::filesystem;
-#define DEFAULT_CLIENT_KEY_FILE "/etc/kuksa-val/Client.key"
-#define DEFAULT_CLIENT_CERT_FILE "/etc/kuksa-val/Client.pem"
#define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem"
inline
@@ -21,32 +24,32 @@ void load_string_file(const filesystem::path& p, std::string& str)
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(p, std::ios_base::binary);
- std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p));
- str.resize(sz, '\0');
- file.read(&str[0], sz);
+ if (file.good()) {
+ std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p));
+ str.resize(sz, '\0');
+ file.read(&str[0], sz);
+ } else {
+ str.clear();
+ }
}
-VisConfig::VisConfig(const std::string &hostname,
- const unsigned port,
- const std::string &clientKey,
- const std::string &clientCert,
- const std::string &caCert,
- const std::string &authToken,
- bool verifyPeer) :
+KuksaConfig::KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken) :
m_hostname(hostname),
m_port(port),
- m_clientKey(clientKey),
- m_clientCert(clientCert),
m_caCert(caCert),
+ m_tlsServerName(tlsServerName),
m_authToken(authToken),
- m_verifyPeer(verifyPeer),
m_verbose(0),
m_valid(true)
{
// Potentially could do some certificate validation here...
}
-VisConfig::VisConfig(const std::string &appname) :
+KuksaConfig::KuksaConfig(const std::string &appname) :
m_valid(false)
{
std::string config("/etc/xdg/AGL/");
@@ -70,7 +73,7 @@ VisConfig::VisConfig(const std::string &appname) :
return;
}
const property_tree::ptree &settings =
- pt.get_child("vis-client", property_tree::ptree());
+ pt.get_child("kuksa-client", property_tree::ptree());
m_hostname = settings.get("server", "localhost");
std::stringstream ss;
@@ -81,48 +84,12 @@ VisConfig::VisConfig(const std::string &appname) :
return;
}
- m_port = settings.get("port", 8090);
+ m_port = settings.get("port", 55555);
if (m_port == 0) {
std::cerr << "Invalid server port" << std::endl;
return;
}
- // Default to disabling peer verification for now to be able
- // to use the default upstream KUKSA.val certificates for
- // testing. Wrangling server and CA certificate generation
- // and management to be able to verify will require further
- // investigation.
- m_verifyPeer = settings.get("verify-server", false);
-
- std::string keyFileName = settings.get("key", DEFAULT_CLIENT_KEY_FILE);
- std::stringstream().swap(ss);
- ss << keyFileName;
- ss >> std::quoted(keyFileName);
- ss.str("");
- if (keyFileName.empty()) {
- std::cerr << "Invalid client key filename" << std::endl;
- return;
- }
- load_string_file(keyFileName, m_clientKey);
- if (m_clientKey.empty()) {
- std::cerr << "Invalid client key file" << std::endl;
- return;
- }
-
- std::string certFileName = settings.get("certificate", DEFAULT_CLIENT_CERT_FILE);
- std::stringstream().swap(ss);
- ss << certFileName;
- ss >> std::quoted(certFileName);
- if (certFileName.empty()) {
- std::cerr << "Invalid client certificate filename" << std::endl;
- return;
- }
- load_string_file(certFileName, m_clientCert);
- if (m_clientCert.empty()) {
- std::cerr << "Invalid client certificate file" << std::endl;
- return;
- }
-
std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE);
std::stringstream().swap(ss);
ss << caCertFileName;
@@ -131,12 +98,14 @@ VisConfig::VisConfig(const std::string &appname) :
std::cerr << "Invalid CA certificate filename" << std::endl;
return;
}
- load_string_file(caCertFileName, m_caCert);
+ readFile(caCertFileName, m_caCert);
if (m_caCert.empty()) {
std::cerr << "Invalid CA certificate file" << std::endl;
return;
}
+ m_tlsServerName = settings.get("tls-server-name", "");
+
std::string authTokenFileName = settings.get("authorization", "");
std::stringstream().swap(ss);
ss << authTokenFileName;
@@ -145,7 +114,7 @@ VisConfig::VisConfig(const std::string &appname) :
std::cerr << "Invalid authorization token filename" << std::endl;
return;
}
- load_string_file(authTokenFileName, m_authToken);
+ readFile(authTokenFileName, m_authToken);
if (m_authToken.empty()) {
std::cerr << "Invalid authorization token file" << std::endl;
return;
@@ -165,3 +134,14 @@ VisConfig::VisConfig(const std::string &appname) :
m_valid = true;
}
+
+// Private
+
+void KuksaConfig::readFile(const std::string &filename, std::string &data)
+{
+ try {
+ load_string_file(filename, data);
+ } catch (const std::exception &e) {
+ data.clear();
+ }
+}
diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h
new file mode 100644
index 0000000..e70385f
--- /dev/null
+++ b/src/KuksaConfig.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _KUKSA_CONFIG_H
+#define _KUKSA_CONFIG_H
+
+#include <string>
+
+class KuksaConfig
+{
+public:
+ explicit KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken);
+ explicit KuksaConfig(const std::string &appname);
+ ~KuksaConfig() {};
+
+ std::string hostname() { return m_hostname; };
+ unsigned port() { return m_port; };
+ std::string caCert() { return m_caCert; };
+ std::string tlsServerName() { return m_tlsServerName; };
+ std::string authToken() { return m_authToken; };
+ bool valid() { return m_valid; };
+ unsigned verbose() { return m_verbose; };
+
+private:
+ std::string m_hostname;
+ unsigned m_port;
+ std::string m_caCert;
+ std::string m_tlsServerName;
+ std::string m_authToken;
+ unsigned m_verbose;
+ bool m_valid;
+
+ void readFile(const std::string &filename, std::string &data);
+};
+
+#endif // _KUKSA_CONFIG_H
diff --git a/src/audiomixer-service.cpp b/src/audiomixer-service.cpp
deleted file mode 100644
index 5787153..0000000
--- a/src/audiomixer-service.cpp
+++ /dev/null
@@ -1,129 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#include "audiomixer-service.hpp"
-#include <iostream>
-#include <algorithm>
-
-
-AudiomixerService::AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) :
- VisSession(config, ioc, ctx)
-{
- m_audiomixer = audiomixer_new();
- if (m_audiomixer) {
- // Set up callbacks for WirePlumber events
- m_audiomixer_events.controls_changed = audiomixer_control_change_cb;
- m_audiomixer_events.value_changed = audiomixer_value_change_cb;
- audiomixer_add_event_listener(m_audiomixer, &m_audiomixer_events, this);
-
- // Drive connecting to PipeWire core and refreshing controls list
- audiomixer_lock(m_audiomixer);
- audiomixer_ensure_controls(m_audiomixer, 3);
- audiomixer_unlock(m_audiomixer);
- } else {
- std::cerr << "Could not create WirePlumber connection" << std::endl;
- }
-}
-
-AudiomixerService::~AudiomixerService()
-{
- audiomixer_free(m_audiomixer);
-}
-
-void AudiomixerService::handle_authorized_response(void)
-{
- subscribe("Vehicle.Cabin.Infotainment.Media.Volume");
- subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeUp");
- subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeDown");
- subscribe("Vehicle.Cabin.SteeringWheel.Switches.VolumeMute");
-
- // Set initial volume in VSS
- // For now a value of 50 matches the default in the homescreen app.
- // Ideally there would be some form of persistence scheme to restore
- // the last value on restart.
- set("Vehicle.Cabin.Infotainment.Media.Volume", "50");
-}
-
-void AudiomixerService::handle_get_response(std::string &path, std::string &value, std::string &timestamp)
-{
- // Placeholder since no gets are performed ATM
-}
-
-void AudiomixerService::handle_notification(std::string &path, std::string &value, std::string &timestamp)
-{
- if (!m_audiomixer) {
- return;
- }
-
- audiomixer_lock(m_audiomixer);
-
- const struct mixer_control *ctl = audiomixer_find_control(m_audiomixer, "Master Playback");
- if (!ctl) {
- audiomixer_unlock(m_audiomixer);
- return;
- }
-
- if (path == "Vehicle.Cabin.Infotainment.Media.Volume") {
- try {
- int volume = std::stoi(value);
- if (volume >= 0 && volume <= 100) {
- double v = (double) volume / 100.0;
- if (m_config.verbose() > 1)
- std::cout << "Setting volume to " << v << std::endl;
- audiomixer_change_volume(m_audiomixer, ctl, v);
- }
- }
- catch (std::exception ex) {
- // ignore bad value
- }
- } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp" && value == "true") {
- double volume = ctl->volume;
- volume += 0.05; // up 5%
- if (volume > 1.0)
- volume = 1.0; // clamp to 100%
- if (m_config.verbose() > 1)
- std::cout << "Increasing volume to " << volume << std::endl;
- audiomixer_change_volume(m_audiomixer, ctl, volume);
-
- } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown" && value == "true") {
- double volume = ctl->volume;
- volume -= 0.05; // down 5%
- if (volume < 0.0)
- volume = 0.0; // clamp to 0%
- if (m_config.verbose() > 1)
- std::cout << "Decreasing volume to " << volume << std::endl;
- audiomixer_change_volume(m_audiomixer, ctl, volume);
-
- } else if (path == "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute" && value == "true") {
- if (m_config.verbose() > 1) {
- if (ctl->mute)
- std::cout << "Unmuting" << std::endl;
- else
- std::cout << "Muting" << std::endl;
- }
- audiomixer_change_mute(m_audiomixer, ctl, !ctl->mute);
- }
- // else ignore
-
- audiomixer_unlock(m_audiomixer);
-}
-
-void AudiomixerService::handle_control_change(void)
-{
- // Ignore for now
-}
-
-void AudiomixerService::handle_value_change(unsigned int change_mask, const struct mixer_control *control)
-{
- if (!control)
- return;
-
- if (change_mask & MIXER_CONTROL_CHANGE_FLAG_VOLUME) {
- if (std::string(control->name) == "Master Playback") {
- // Push change into VIS
- std::string value = std::to_string((int) (control->volume * 100.0));
- set("Vehicle.Cabin.Infotainment.Media.Volume", value);
- }
- } else if (change_mask & MIXER_CONTROL_CHANGE_FLAG_MUTE) {
- // For now, do nothing, new state is in control->mute
- }
-}
diff --git a/src/audiomixer-service.hpp b/src/audiomixer-service.hpp
deleted file mode 100644
index cb00584..0000000
--- a/src/audiomixer-service.hpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _AUDIOMIXER_SERVICE_HPP
-#define _AUDIOMIXER_SERVICE_HPP
-
-#include "vis-session.hpp"
-#include "audiomixer.h"
-
-class AudiomixerService : public VisSession
-{
- struct audiomixer *m_audiomixer;
-
-public:
- AudiomixerService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx);
-
- ~AudiomixerService();
-
- static void audiomixer_control_change_cb(void *data) {
- if (data)
- ((AudiomixerService*) data)->handle_control_change();
- };
-
- static void audiomixer_value_change_cb(void *data,
- unsigned int change_mask,
- const struct mixer_control *control) {
- if (data)
- ((AudiomixerService*) data)->handle_value_change(change_mask, control);
- }
-
-protected:
- struct audiomixer_events m_audiomixer_events;
-
- virtual void handle_authorized_response(void) override;
-
- virtual void handle_get_response(std::string &path, std::string &value, std::string &timestamp) override;
-
- virtual void handle_notification(std::string &path, std::string &value, std::string &timestamp) override;
-
- virtual void handle_control_change(void);
-
- virtual void handle_value_change(unsigned int change_mask, const struct mixer_control *control);
-};
-
-#endif // _AUDIOMIXER_SERVICE_HPP
diff --git a/src/main.cpp b/src/main.cpp
index 0960f1b..6776472 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,34 +1,54 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include <iostream>
-#include <iomanip>
-#include <boost/asio/signal_set.hpp>
-#include <boost/bind.hpp>
-#include "audiomixer-service.hpp"
+//#include <unistd.h>
+//#include <signal.h>
+#include <glib.h>
+#include <glib-unix.h>
+#include <systemd/sd-daemon.h>
-using work_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
+#include "AudiomixerService.h"
+
+static gboolean quit_cb(gpointer user_data)
+{
+ GMainLoop *loop = (GMainLoop*) user_data;
+
+ g_info("Quitting...");
+
+ if (loop)
+ g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop);
+ else
+ exit(0);
+
+ return G_SOURCE_REMOVE;
+}
int main(int argc, char** argv)
{
- // The io_context is required for all I/O
- net::io_context ioc;
+ GMainLoop *loop = NULL;
+
+ loop = g_main_loop_new(NULL, FALSE);
+ if (!loop) {
+ std::cerr << "Could not create GLib event loop" << std::endl;
+ exit(1);
+ }
+
+ KuksaConfig config("agl-service-audiomixer");
- // Register to stop I/O context on SIGINT and SIGTERM
- net::signal_set signals(ioc, SIGINT, SIGTERM);
- signals.async_wait(boost::bind(&net::io_context::stop, &ioc));
+ g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop);
+ g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop);
- // The SSL context is required, and holds certificates
- ssl::context ctx{ssl::context::tlsv12_client};
+ AudiomixerService service(config, loop);
- // Launch the asynchronous operation
- VisConfig config("agl-service-audiomixer");
- std::make_shared<AudiomixerService>(config, ioc, ctx)->run();
+ sd_notify(0, "READY=1");
- // Ensure I/O context continues running even if there's no work
- work_guard_type work_guard(ioc.get_executor());
+ g_main_loop_run(loop);
- // Run the I/O context
- ioc.run();
+ // Clean up
+ g_main_loop_unref(loop);
return 0;
}
diff --git a/src/meson.build b/src/meson.build
index b1603fc..6c50419 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -1,19 +1,60 @@
boost_dep = dependency('boost',
version : '>=1.72',
modules : [ 'thread', 'filesystem', 'program_options', 'log', 'system' ])
-openssl_dep = dependency('openssl')
-thread_dep = dependency('threads')
-wp_dep = dependency('wireplumber-0.4')
-src = [ 'vis-config.cpp',
- 'vis-session.cpp',
- 'audiomixer-service.cpp',
- 'audiomixer.c',
- 'main.cpp'
+cpp = meson.get_compiler('cpp')
+grpcpp_reflection_dep = cpp.find_library('grpc++_reflection')
+
+service_dep = [
+ boost_dep,
+ dependency('openssl'),
+ dependency('threads'),
+ dependency('libsystemd'),
+ dependency('wireplumber-0.4'),
+ dependency('protobuf'),
+ dependency('grpc'),
+ dependency('grpc++'),
+ grpcpp_reflection_dep
]
+
+protoc = find_program('protoc')
+grpc_cpp = find_program('grpc_cpp_plugin')
+
+protos_base_dir = get_option('protos')
+protos_dir = protos_base_dir / 'kuksa/val/v1'
+protoc_gen = generator(protoc, \
+ output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'],
+ arguments : ['-I=' + protos_base_dir,
+ '--cpp_out=@BUILD_DIR@',
+ '@INPUT@'])
+generated_protoc_sources = [ \
+ protoc_gen.process(protos_dir / 'types.proto', preserve_path_from : protos_base_dir),
+ protoc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir),
+]
+
+grpc_gen = generator(protoc, \
+ output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'],
+ arguments : ['-I=' + protos_base_dir,
+ '--grpc_out=@BUILD_DIR@',
+ '--plugin=protoc-gen-grpc=' + grpc_cpp.path(),
+ '@INPUT@'])
+generated_grpc_sources = [ \
+ grpc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir),
+]
+
+src = [
+ 'KuksaConfig.cpp',
+ 'KuksaClient.cpp',
+ 'AudiomixerService.cpp',
+ 'audiomixer.c',
+ 'main.cpp',
+ generated_protoc_sources,
+ generated_grpc_sources,
+]
+
executable('agl-service-audiomixer',
src,
- dependencies: [boost_dep, openssl_dep, thread_dep, systemd_dep, wp_dep],
+ dependencies: service_dep,
install: true,
c_args : [
'-D_XOPEN_SOURCE=700',
diff --git a/src/vis-config.hpp b/src/vis-config.hpp
deleted file mode 100644
index b0f72f9..0000000
--- a/src/vis-config.hpp
+++ /dev/null
@@ -1,43 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _VIS_CONFIG_HPP
-#define _VIS_CONFIG_HPP
-
-#include <string>
-
-class VisConfig
-{
-public:
- explicit VisConfig(const std::string &hostname,
- const unsigned port,
- const std::string &clientKey,
- const std::string &clientCert,
- const std::string &caCert,
- const std::string &authToken,
- bool verifyPeer = true);
- explicit VisConfig(const std::string &appname);
- ~VisConfig() {};
-
- std::string hostname() { return m_hostname; };
- unsigned port() { return m_port; };
- std::string clientKey() { return m_clientKey; };
- std::string clientCert() { return m_clientCert; };
- std::string caCert() { return m_caCert; };
- std::string authToken() { return m_authToken; };
- bool verifyPeer() { return m_verifyPeer; };
- bool valid() { return m_valid; };
- unsigned verbose() { return m_verbose; };
-
-private:
- std::string m_hostname;
- unsigned m_port;
- std::string m_clientKey;
- std::string m_clientCert;
- std::string m_caCert;
- std::string m_authToken;
- bool m_verifyPeer;
- unsigned m_verbose;
- bool m_valid;
-};
-
-#endif // _VIS_CONFIG_HPP
diff --git a/src/vis-session.cpp b/src/vis-session.cpp
deleted file mode 100644
index 53a554c..0000000
--- a/src/vis-session.cpp
+++ /dev/null
@@ -1,380 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#include "vis-session.hpp"
-#include <iostream>
-#include <sstream>
-#include <thread>
-
-
-// Logging helper
-static void log_error(beast::error_code error, char const* what)
-{
- std::cerr << what << " error: " << error.message() << std::endl;
-}
-
-
-// Resolver and socket require an io_context
-VisSession::VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) :
- m_config(config),
- m_resolver(net::make_strand(ioc)),
- m_ws(net::make_strand(ioc), ctx)
-{
-}
-
-// Start the asynchronous operation
-void VisSession::run()
-{
- if (!m_config.valid()) {
- return;
- }
-
- // Start by resolving hostname
- m_resolver.async_resolve(m_config.hostname(),
- std::to_string(m_config.port()),
- beast::bind_front_handler(&VisSession::on_resolve,
- shared_from_this()));
-}
-
-void VisSession::on_resolve(beast::error_code error,
- tcp::resolver::results_type results)
-{
- if(error) {
- log_error(error, "resolve");
- return;
- }
-
- // Set a timeout on the connect operation
- beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30));
-
- // Connect to resolved address
- if (m_config.verbose())
- std::cout << "Connecting" << std::endl;
- m_results = results;
- connect();
-}
-
-void VisSession::connect()
-{
- beast::get_lowest_layer(m_ws).async_connect(m_results,
- beast::bind_front_handler(&VisSession::on_connect,
- shared_from_this()));
-}
-
-void VisSession::on_connect(beast::error_code error,
- tcp::resolver::results_type::endpoint_type endpoint)
-{
- if(error) {
- // The server can take a while to be ready to accept connections,
- // so keep retrying until we hit the timeout.
- if (error == net::error::timed_out) {
- log_error(error, "connect");
- return;
- }
-
- // Delay 500 ms before retrying
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
-
- if (m_config.verbose())
- std::cout << "Connecting" << std::endl;
-
- connect();
- return;
- }
-
- if (m_config.verbose())
- std::cout << "Connected" << std::endl;
-
- // Set handshake timeout
- beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30));
-
- // Set SNI Hostname (many hosts need this to handshake successfully)
- if(!SSL_set_tlsext_host_name(m_ws.next_layer().native_handle(),
- m_config.hostname().c_str()))
- {
- error = beast::error_code(static_cast<int>(::ERR_get_error()),
- net::error::get_ssl_category());
- log_error(error, "connect");
- return;
- }
-
- // Update the hostname. This will provide the value of the
- // Host HTTP header during the WebSocket handshake.
- // See https://tools.ietf.org/html/rfc7230#section-5.4
- m_hostname = m_config.hostname() + ':' + std::to_string(endpoint.port());
-
- if (m_config.verbose())
- std::cout << "Negotiating SSL handshake" << std::endl;
-
- // Perform the SSL handshake
- m_ws.next_layer().async_handshake(ssl::stream_base::client,
- beast::bind_front_handler(&VisSession::on_ssl_handshake,
- shared_from_this()));
-}
-
-void VisSession::on_ssl_handshake(beast::error_code error)
-{
- if(error) {
- log_error(error, "SSL handshake");
- return;
- }
-
- // Turn off the timeout on the tcp_stream, because
- // the websocket stream has its own timeout system.
- beast::get_lowest_layer(m_ws).expires_never();
-
- // NOTE: Explicitly not setting websocket stream timeout here,
- // as the client is long-running.
-
- if (m_config.verbose())
- std::cout << "Negotiating WSS handshake" << std::endl;
-
- // Perform handshake
- m_ws.async_handshake(m_hostname,
- "/",
- beast::bind_front_handler(&VisSession::on_handshake,
- shared_from_this()));
-}
-
-void VisSession::on_handshake(beast::error_code error)
-{
- if(error) {
- log_error(error, "WSS handshake");
- return;
- }
-
- if (m_config.verbose())
- std::cout << "Authorizing" << std::endl;
-
- // Authorize
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"]= "authorize";
- req["tokens"] = m_config.authToken();
-
- m_ws.async_write(net::buffer(req.dump(4)),
- beast::bind_front_handler(&VisSession::on_authorize,
- shared_from_this()));
-}
-
-void VisSession::on_authorize(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "authorize");
- return;
- }
-
- // Read response
- m_ws.async_read(m_buffer,
- beast::bind_front_handler(&VisSession::on_read,
- shared_from_this()));
-}
-
-// NOTE: Placeholder for now
-void VisSession::on_write(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "write");
- return;
- }
-
- // Do nothing...
-}
-
-void VisSession::on_read(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "read");
- return;
- }
-
- // Handle message
- std::string s = beast::buffers_to_string(m_buffer.data());
- json response = json::parse(s, nullptr, false);
- if (!response.is_discarded()) {
- handle_message(response);
- } else {
- std::cerr << "json::parse failed? got " << s << std::endl;
- }
- m_buffer.consume(m_buffer.size());
-
- // Read next message
- m_ws.async_read(m_buffer,
- beast::bind_front_handler(&VisSession::on_read,
- shared_from_this()));
-}
-
-void VisSession::get(const std::string &path)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "get";
- req["path"] = path;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-void VisSession::set(const std::string &path, const std::string &value)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "set";
- req["path"] = path;
- req["value"] = value;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-void VisSession::subscribe(const std::string &path)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "subscribe";
- req["path"] = path;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-bool VisSession::parseData(const json &message, std::string &path, std::string &value, std::string &timestamp)
-{
- if (message.contains("error")) {
- std::string error = message["error"];
- return false;
- }
-
- if (!(message.contains("data") && message["data"].is_object())) {
- std::cerr << "Malformed message (data missing)" << std::endl;
- return false;
- }
- auto data = message["data"];
- if (!(data.contains("path") && data["path"].is_string())) {
- std::cerr << "Malformed message (path missing)" << std::endl;
- return false;
- }
- path = data["path"];
- // Convert '/' to '.' in paths to ensure consistency for clients
- std::replace(path.begin(), path.end(), '/', '.');
-
- if (!(data.contains("dp") && data["dp"].is_object())) {
- std::cerr << "Malformed message (datapoint missing)" << std::endl;
- return false;
- }
- auto dp = data["dp"];
- if (!dp.contains("value")) {
- std::cerr << "Malformed message (value missing)" << std::endl;
- return false;
- } else if (dp["value"].is_string()) {
- value = dp["value"];
- } else if (dp["value"].is_number_float()) {
- double num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_number_unsigned()) {
- unsigned int num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_number_integer()) {
- int num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_boolean()) {
- value = dp["value"] ? "true" : "false";
- } else {
- std::cerr << "Malformed message (unsupported value type)" << std::endl;
- return false;
- }
-
- if (!(dp.contains("ts") && dp["ts"].is_string())) {
- std::cerr << "Malformed message (timestamp missing)" << std::endl;
- return false;
- }
- timestamp = dp["ts"];
-
- return true;
-}
-
-void VisSession::handle_message(const json &message)
-{
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: enter, message = " << to_string(message) << std::endl;
-
- if (!message.contains("action")) {
- std::cerr << "Received unknown message (no action), discarding" << std::endl;
- return;
- }
-
- std::string action = message["action"];
- if (action == "authorize") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS authorization failed: " << error << std::endl;
- } else {
- if (m_config.verbose() > 1)
- std::cout << "authorized" << std::endl;
-
- handle_authorized_response();
- }
- } else if (action == "subscribe") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS subscription failed: " << error << std::endl;
- }
- } else if (action == "get") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS get failed: " << error << std::endl;
- } else {
- std::string path, value, ts;
- if (parseData(message, path, value, ts)) {
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: got response " << path << " = " << value << std::endl;
-
- handle_get_response(path, value, ts);
- }
- }
- } else if (action == "set") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS set failed: " << error;
- }
- } else if (action == "subscription") {
- std::string path, value, ts;
- if (parseData(message, path, value, ts)) {
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: got notification " << path << " = " << value << std::endl;
-
- handle_notification(path, value, ts);
- }
- } else {
- std::cerr << "unhandled VIS response of type: " << action;
- }
-
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: exit" << std::endl;
-}
-
diff --git a/src/vis-session.hpp b/src/vis-session.hpp
deleted file mode 100644
index 8c7b0d9..0000000
--- a/src/vis-session.hpp
+++ /dev/null
@@ -1,78 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _VIS_SESSION_HPP
-#define _VIS_SESSION_HPP
-
-#include "vis-config.hpp"
-#include <atomic>
-#include <string>
-#include <boost/beast/core.hpp>
-#include <boost/beast/ssl.hpp>
-#include <boost/beast/websocket.hpp>
-#include <boost/beast/websocket/ssl.hpp>
-#include <boost/asio/strand.hpp>
-#include <nlohmann/json.hpp>
-
-namespace beast = boost::beast;
-namespace websocket = beast::websocket;
-namespace net = boost::asio;
-namespace ssl = boost::asio::ssl;
-using tcp = boost::asio::ip::tcp;
-using json = nlohmann::json;
-
-
-class VisSession : public std::enable_shared_from_this<VisSession>
-{
- //net::io_context m_ioc;
- tcp::resolver m_resolver;
- tcp::resolver::results_type m_results;
- std::string m_hostname;
- websocket::stream<beast::ssl_stream<beast::tcp_stream>> m_ws;
- beast::flat_buffer m_buffer;
-
-public:
- // Resolver and socket require an io_context
- explicit VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx);
-
- // Start the asynchronous operation
- void run();
-
-protected:
- VisConfig m_config;
- std::atomic_uint m_requestid;
-
- void on_resolve(beast::error_code error, tcp::resolver::results_type results);
-
- void connect();
-
- void on_connect(beast::error_code error, tcp::resolver::results_type::endpoint_type endpoint);
-
- void on_ssl_handshake(beast::error_code error);
-
- void on_handshake(beast::error_code error);
-
- void on_authorize(beast::error_code error, std::size_t bytes_transferred);
-
- void on_write(beast::error_code error, std::size_t bytes_transferred);
-
- void on_read(beast::error_code error, std::size_t bytes_transferred);
-
- void get(const std::string &path);
-
- void set(const std::string &path, const std::string &value);
-
- void subscribe(const std::string &path);
-
- void handle_message(const json &message);
-
- bool parseData(const json &message, std::string &path, std::string &value, std::string &timestamp);
-
- virtual void handle_authorized_response(void) = 0;
-
- virtual void handle_get_response(std::string &path, std::string &value, std::string &timestamp) = 0;
-
- virtual void handle_notification(std::string &path, std::string &value, std::string &timestamp) = 0;
-
-};
-
-#endif // _VIS_SESSION_HPP