summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/HvacCanHelper.cpp (renamed from src/hvac-can-helper.cpp)8
-rw-r--r--src/HvacCanHelper.h (renamed from src/hvac-can-helper.hpp)12
-rw-r--r--src/HvacLedHelper.cpp (renamed from src/hvac-led-helper.cpp)8
-rw-r--r--src/HvacLedHelper.h (renamed from src/hvac-led-helper.hpp)12
-rw-r--r--src/HvacService.cpp223
-rw-r--r--src/HvacService.h65
-rw-r--r--src/KuksaClient.cpp373
-rw-r--r--src/KuksaClient.h79
-rw-r--r--src/KuksaConfig.cpp (renamed from src/vis-config.cpp)96
-rw-r--r--src/KuksaConfig.h43
-rw-r--r--src/hvac-service.cpp85
-rw-r--r--src/hvac-service.hpp33
-rw-r--r--src/main.cpp60
-rw-r--r--src/meson.build61
-rw-r--r--src/vis-config.hpp43
-rw-r--r--src/vis-session.cpp380
-rw-r--r--src/vis-session.hpp78
17 files changed, 939 insertions, 720 deletions
diff --git a/src/hvac-can-helper.cpp b/src/HvacCanHelper.cpp
index 634f4b0..0c0f937 100644
--- a/src/hvac-can-helper.cpp
+++ b/src/HvacCanHelper.cpp
@@ -1,6 +1,10 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include "hvac-can-helper.hpp"
+#include "HvacCanHelper.h"
#include <iostream>
#include <iomanip>
#include <sstream>
diff --git a/src/hvac-can-helper.hpp b/src/HvacCanHelper.h
index 4175e6b..0787834 100644
--- a/src/hvac-can-helper.hpp
+++ b/src/HvacCanHelper.h
@@ -1,7 +1,11 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#ifndef _HVAC_CAN_HELPER_HPP
-#define _HVAC_CAN_HELPER_HPP
+#ifndef _HVAC_CAN_HELPER_H
+#define _HVAC_CAN_HELPER_H
#include <cstdint>
#include <string>
@@ -51,4 +55,4 @@ private:
uint8_t m_fan_speed;
};
-#endif // _HVAC_CAN_HELPER_HPP
+#endif // _HVAC_CAN_HELPER_H
diff --git a/src/hvac-led-helper.cpp b/src/HvacLedHelper.cpp
index 3c86c81..54d4b2f 100644
--- a/src/hvac-led-helper.cpp
+++ b/src/HvacLedHelper.cpp
@@ -1,6 +1,10 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include "hvac-led-helper.hpp"
+#include "HvacLedHelper.h"
#include <iostream>
#include <iomanip>
#include <sstream>
diff --git a/src/hvac-led-helper.hpp b/src/HvacLedHelper.h
index 6f73aac..be94393 100644
--- a/src/hvac-led-helper.hpp
+++ b/src/HvacLedHelper.h
@@ -1,7 +1,11 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#ifndef _HVAC_LED_HELPER_HPP
-#define _HVAC_LED_HELPER_HPP
+#ifndef _HVAC_LED_HELPER_H
+#define _HVAC_LED_HELPER_H
#include <cstdint>
#include <string>
@@ -32,4 +36,4 @@ private:
uint8_t m_temp_right;
};
-#endif // _HVAC_LED_HELPER_HPP
+#endif // _HVAC_LED_HELPER_H
diff --git a/src/HvacService.cpp b/src/HvacService.cpp
new file mode 100644
index 0000000..b53c5d8
--- /dev/null
+++ b/src/HvacService.cpp
@@ -0,0 +1,223 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include "HvacService.h"
+#include <string>
+#include <sstream>
+#include <iostream>
+#include <algorithm>
+
+
+HvacService::HvacService(const KuksaConfig &config, GMainLoop *loop) :
+ m_loop(loop),
+ m_config(config),
+ m_can_helper(),
+ m_led_helper()
+{
+ // Create gRPC channel
+ std::string host = m_config.hostname();
+ host += ":";
+ std::stringstream ss;
+ ss << m_config.port();
+ host += ss.str();
+
+ std::shared_ptr<grpc::Channel> channel;
+ if (!m_config.caCert().empty()) {
+ grpc::SslCredentialsOptions options;
+ options.pem_root_certs = m_config.caCert();
+ if (!m_config.tlsServerName().empty()) {
+ grpc::ChannelArguments args;
+ auto target = m_config.tlsServerName();
+ std::cout << "Overriding TLS target name with " << target << std::endl;
+ args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target);
+ channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args);
+ } else {
+ channel = grpc::CreateChannel(host, grpc::SslCredentials(options));
+ }
+ } else {
+ channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials());
+ }
+
+ // Wait for the channel to be ready
+ std::cout << "Waiting for Databroker gRPC channel" << std::endl;
+ while (!channel->WaitForConnected(std::chrono::system_clock::now() +
+ std::chrono::milliseconds(500))) ;
+ std::cout << "Databroker gRPC channel ready" << std::endl;
+
+ m_broker = new KuksaClient(channel, m_config);
+ if (m_broker) {
+ // Listen to actuator target updates
+ std::map<std::string, bool> signals;
+ signals["Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature"] = true;
+ signals["Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed"] = true;
+ signals["Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature"] = true;
+ signals["Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"] = true;
+ m_broker->subscribe(signals,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+ }
+}
+
+HvacService::~HvacService()
+{
+ delete m_broker;
+}
+
+// Private
+
+void HvacService::HandleSignalChange(const std::string &path, const Datapoint &dp)
+{
+ if (m_config.verbose() > 1)
+ std::cout << "HvacService::HandleSignalChange: Value received for " << path << std::endl;
+
+ if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature") {
+ if (dp.has_int32()) {
+ int temp = dp.int32();
+ if (temp >= 0 && temp < 256)
+ set_left_temperature(temp);
+ }
+ } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature") {
+ if (dp.has_int32()) {
+ int temp = dp.int32();
+ if (temp >= 0 && temp < 256)
+ set_right_temperature(temp);
+ }
+ } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed") {
+ if (dp.has_uint32()) {
+ int speed = dp.uint32();
+ if (speed >= 0 && speed <= 100)
+ set_left_fan_speed(speed);
+ }
+ } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed") {
+ if (dp.has_uint32()) {
+ int speed = dp.uint32();
+ if (speed >= 0 && speed <= 100)
+ set_right_fan_speed(speed);
+ }
+ }
+ // else ignore
+}
+
+void HvacService::HandleSignalSetError(const std::string &path, const Error &error)
+{
+ std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl;
+}
+
+void HvacService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status)
+{
+ if (m_config.verbose())
+ std::cout << "Subscribe status = " << status.error_code() <<
+ " (" << status.error_message() << ")" << std::endl;
+
+ if (status.error_code() == grpc::CANCELLED) {
+ if (m_config.verbose())
+ std::cerr << "Subscribe canceled, assuming shutdown" << std::endl;
+ return;
+ }
+
+ // Queue up a resubcribe via the GLib event loop callback
+ struct resubscribe_data *data = new (struct resubscribe_data);
+ if (!data) {
+ std::cerr << "Could not create resubcribe_data" << std::endl;
+ exit(1);
+ }
+ data->self = this;
+ // Need to copy request since the one we have been handed is from the
+ // finished subscribe and will be going away.
+ data->request = new SubscribeRequest(*request);
+ if (!data->request) {
+ std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl;
+ exit(1);
+ }
+
+ // NOTE: Waiting 100 milliseconds for now; it is possible that some
+ // randomization and/or back-off may need to be added if many
+ // subscribes are active, or switching to some other resubscribe
+ // scheme altogether (e.g. post subscribes to a thread that waits
+ // for the channel to become connected again).
+ g_timeout_add_full(G_PRIORITY_DEFAULT,
+ 100,
+ resubscribe_cb,
+ data,
+ NULL);
+}
+
+void HvacService::Resubscribe(const SubscribeRequest *request)
+{
+ if (!(m_broker && request))
+ return;
+
+ m_broker->subscribe(request,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+}
+
+// NOTE: The following should perhaps be scheduling work via the GLib
+// main loop to avoid potentially blocking threads from the gRPC
+// pool.
+
+void HvacService::set_left_temperature(uint8_t temp)
+{
+ m_can_helper.set_left_temperature(temp);
+ m_led_helper.set_left_temperature(temp);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature",
+ (int) temp,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+}
+
+void HvacService::set_right_temperature(uint8_t temp)
+{
+ m_can_helper.set_right_temperature(temp);
+ m_led_helper.set_right_temperature(temp);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature",
+ (int) temp,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+}
+
+void HvacService::set_left_fan_speed(uint8_t speed)
+{
+ set_fan_speed(speed);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed",
+ speed,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+}
+
+void HvacService::set_right_fan_speed(uint8_t speed)
+{
+ set_fan_speed(speed);
+
+ // Push out new value
+ m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed",
+ speed,
+ [this](const std::string &path, const Error &error) {
+ HandleSignalSetError(path, error);
+ });
+}
+
+void HvacService::set_fan_speed(uint8_t speed)
+{
+ m_can_helper.set_fan_speed(speed);
+}
diff --git a/src/HvacService.h b/src/HvacService.h
new file mode 100644
index 0000000..756f6f3
--- /dev/null
+++ b/src/HvacService.h
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _HVAC_SERVICE_H
+#define _HVAC_SERVICE_H
+
+#include <glib.h>
+
+#include "KuksaConfig.h"
+#include "KuksaClient.h"
+#include "HvacCanHelper.h"
+#include "HvacLedHelper.h"
+
+class HvacService
+{
+public:
+ HvacService(const KuksaConfig &config, GMainLoop *loop = NULL);
+
+ ~HvacService();
+
+ // Callback for KuksaClient subscribe API reconnect
+
+ static gboolean resubscribe_cb(gpointer data) {
+ struct resubscribe_data *d = (struct resubscribe_data*) data;
+ if (d && d->self) {
+ ((HvacService*) d->self)->Resubscribe(d->request);
+ }
+ return FALSE;
+ }
+
+private:
+ struct resubscribe_data {
+ HvacService *self;
+ const SubscribeRequest *request;
+ };
+
+ GMainLoop *m_loop;
+ KuksaConfig m_config;
+ KuksaClient *m_broker;
+ HvacCanHelper m_can_helper;
+ HvacLedHelper m_led_helper;
+
+ void HandleSignalChange(const std::string &path, const Datapoint &dp);
+
+ void HandleSignalSetError(const std::string &path, const Error &error);
+
+ void HandleSubscribeDone(const SubscribeRequest *request, const Status &status);
+
+ void Resubscribe(const SubscribeRequest *request);
+
+ void set_left_temperature(uint8_t temp);
+
+ void set_right_temperature(uint8_t temp);
+
+ void set_left_fan_speed(uint8_t temp);
+
+ void set_right_fan_speed(uint8_t temp);
+
+ void set_fan_speed(uint8_t temp);
+};
+
+#endif // _HVAC_SERVICE_H
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp
new file mode 100644
index 0000000..844ce0e
--- /dev/null
+++ b/src/KuksaClient.cpp
@@ -0,0 +1,373 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <regex>
+#include <iterator>
+#include <mutex>
+
+#include "KuksaClient.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::ClientReader;
+using grpc::Status;
+
+KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) :
+ m_config(config)
+{
+ m_stub = VAL::NewStub(channel);
+}
+
+void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ GetRequest request;
+ auto entry = request.add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ GetResponse *response = new GetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create GetResponse");
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Get(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleGetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+// Since a set request needs a Datapoint with the appropriate type value,
+// checking the signal metadata to get the type would be a requirement for
+// a generic set call that takes a string as argument. For now, assume
+// that set with a string is specifically for a signal of string type.
+
+void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_string(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_float_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_double_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ auto entry = request->add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ for(auto it = signals.cbegin(); it != signals.cend(); ++it) {
+ auto entry = request->add_entries();
+ entry->set_path(it->first);
+ entry->add_fields(Field::FIELD_PATH);
+ if (it->second)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+ }
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ if (!(request && cb))
+ return;
+
+ class Reader : public grpc::ClientReadReactor<SubscribeResponse> {
+ public:
+ Reader(VAL::Stub *stub,
+ KuksaClient *client,
+ KuksaConfig &config,
+ const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb):
+ client_(client),
+ config_(config),
+ request_(request),
+ cb_(cb),
+ done_cb_(done_cb) {
+ std::string token = config_.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context_.AddMetadata(std::string("authorization"), token);
+ }
+ stub->async()->Subscribe(&context_, request, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (ok) {
+ if (client_)
+ client_->handleSubscribeResponse(&response_, cb_);
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ status_ = s;
+ if (client_) {
+ if (config_.verbose() > 1)
+ std::cerr << "KuksaClient::subscribe::Reader done" << std::endl;
+ client_->handleSubscribeDone(request_, status_, done_cb_);
+ }
+
+ // gRPC engine is done with us, safe to self-delete
+ delete request_;
+ delete this;
+ }
+
+ private:
+ KuksaClient *client_;
+ KuksaConfig config_;
+ const SubscribeRequest *request_;
+ SubscribeResponseCallback cb_;
+ SubscribeDoneCallback done_cb_;
+
+ ClientContext context_;
+ SubscribeResponse response_;
+ std::mutex mutex_;
+ Status status_;
+ };
+ Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb);
+ if (!reader)
+ handleCriticalFailure("Could not create Subscribe reader");
+}
+
+// Private
+
+void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ SetRequest request;
+ auto update = request.add_updates();
+ auto entry = update->mutable_entry();
+ entry->set_path(path);
+ if (actuator) {
+ auto target = entry->mutable_actuator_target();
+ *target = dp;
+ update->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ } else {
+ auto value = entry->mutable_value();
+ *value = dp;
+ update->add_fields(Field::FIELD_VALUE);
+ }
+
+ SetResponse *response = new SetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create SetResponse");
+ delete context;
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Set(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleSetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb)
+{
+ if (!(response && response->entries_size() && cb))
+ return;
+
+ for (auto it = response->entries().begin(); it != response->entries().end(); ++it) {
+ // We expect paths in the response entries
+ if (!it->path().size())
+ continue;
+
+ Datapoint dp;
+ if (it->has_actuator_target())
+ dp = it->actuator_target();
+ else
+ dp = it->value();
+ cb(it->path(), dp);
+ }
+}
+
+void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb)
+{
+ if (!(response && response->errors_size() && cb))
+ return;
+
+ for (auto it = response->errors().begin(); it != response->errors().end(); ++it) {
+ cb(it->path(), it->error());
+ }
+
+}
+
+void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb)
+{
+ if (!(response && response->updates_size() && cb))
+ return;
+
+ for (auto it = response->updates().begin(); it != response->updates().end(); ++it) {
+ // We expect entries that have paths in the response
+ if (!(it->has_entry() && it->entry().path().size()))
+ continue;
+
+ auto entry = it->entry();
+ if (m_config.verbose())
+ std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl;
+
+ Datapoint dp;
+ if (entry.has_actuator_target())
+ dp = entry.actuator_target();
+ else
+ dp = entry.value();
+
+ cb(entry.path(), dp);
+ }
+}
+
+void KuksaClient::handleSubscribeDone(const SubscribeRequest *request,
+ const Status &status,
+ SubscribeDoneCallback cb)
+{
+ if (cb)
+ cb(request, status);
+}
+
+void KuksaClient::handleCriticalFailure(const std::string &error)
+{
+ if (error.size())
+ std::cerr << error << std::endl;
+ exit(1);
+}
+
diff --git a/src/KuksaClient.h b/src/KuksaClient.h
new file mode 100644
index 0000000..fd9a9e0
--- /dev/null
+++ b/src/KuksaClient.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef KUKSA_CLIENT_H
+#define KUKSA_CLIENT_H
+
+#include <string>
+#include <map>
+#include <grpcpp/grpcpp.h>
+#include "kuksa/val/v1/val.grpc.pb.h"
+
+// Just pull in the whole namespace since Datapoint contains a lot of
+// definitions that may potentially be needed.
+using namespace kuksa::val::v1;
+
+using grpc::Status;
+
+#include "KuksaConfig.h"
+
+// API response callback types
+typedef std::function<void(const std::string &path, const Datapoint &dp)> GetResponseCallback;
+typedef std::function<void(const std::string &path, const Error &error)> SetResponseCallback;
+typedef std::function<void(const std::string &path, const Datapoint &dp)> SubscribeResponseCallback;
+typedef std::function<void(const SubscribeRequest *request, const Status &status)> SubscribeDoneCallback;
+
+// KUKSA.val databroker "VAL" gRPC API client class
+
+class KuksaClient
+{
+public:
+ explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config);
+
+ void get(const std::string &path, GetResponseCallback cb, const bool actuator = false);
+
+ void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false);
+
+ void subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator = false,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+
+private:
+ KuksaConfig m_config;
+ std::shared_ptr<VAL::Stub> m_stub;
+
+ void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator);
+
+ void handleGetResponse(const GetResponse *response, GetResponseCallback cb);
+
+ void handleSetResponse(const SetResponse *response, SetResponseCallback cb);
+
+ void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb);
+
+ void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb);
+
+ void handleCriticalFailure(const std::string &error);
+ void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); };
+};
+
+#endif // KUKSA_CLIENT_H
diff --git a/src/vis-config.cpp b/src/KuksaConfig.cpp
index a55e235..8fe09f6 100644
--- a/src/vis-config.cpp
+++ b/src/KuksaConfig.cpp
@@ -1,18 +1,21 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include "vis-config.hpp"
#include <iostream>
#include <iomanip>
#include <sstream>
+#include <exception>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/filesystem.hpp>
+#include "KuksaConfig.h"
namespace property_tree = boost::property_tree;
namespace filesystem = boost::filesystem;
-#define DEFAULT_CLIENT_KEY_FILE "/etc/kuksa-val/Client.key"
-#define DEFAULT_CLIENT_CERT_FILE "/etc/kuksa-val/Client.pem"
#define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem"
inline
@@ -21,32 +24,32 @@ void load_string_file(const filesystem::path& p, std::string& str)
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(p, std::ios_base::binary);
- std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p));
- str.resize(sz, '\0');
- file.read(&str[0], sz);
+ if (file.good()) {
+ std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p));
+ str.resize(sz, '\0');
+ file.read(&str[0], sz);
+ } else {
+ str.clear();
+ }
}
-VisConfig::VisConfig(const std::string &hostname,
- const unsigned port,
- const std::string &clientKey,
- const std::string &clientCert,
- const std::string &caCert,
- const std::string &authToken,
- bool verifyPeer) :
+KuksaConfig::KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken) :
m_hostname(hostname),
m_port(port),
- m_clientKey(clientKey),
- m_clientCert(clientCert),
m_caCert(caCert),
+ m_tlsServerName(tlsServerName),
m_authToken(authToken),
- m_verifyPeer(verifyPeer),
m_verbose(0),
m_valid(true)
{
// Potentially could do some certificate validation here...
}
-VisConfig::VisConfig(const std::string &appname) :
+KuksaConfig::KuksaConfig(const std::string &appname) :
m_valid(false)
{
std::string config("/etc/xdg/AGL/");
@@ -70,7 +73,7 @@ VisConfig::VisConfig(const std::string &appname) :
return;
}
const property_tree::ptree &settings =
- pt.get_child("vis-client", property_tree::ptree());
+ pt.get_child("kuksa-client", property_tree::ptree());
m_hostname = settings.get("server", "localhost");
std::stringstream ss;
@@ -81,48 +84,12 @@ VisConfig::VisConfig(const std::string &appname) :
return;
}
- m_port = settings.get("port", 8090);
+ m_port = settings.get("port", 55555);
if (m_port == 0) {
std::cerr << "Invalid server port" << std::endl;
return;
}
- // Default to disabling peer verification for now to be able
- // to use the default upstream KUKSA.val certificates for
- // testing. Wrangling server and CA certificate generation
- // and management to be able to verify will require further
- // investigation.
- m_verifyPeer = settings.get("verify-server", false);
-
- std::string keyFileName = settings.get("key", DEFAULT_CLIENT_KEY_FILE);
- std::stringstream().swap(ss);
- ss << keyFileName;
- ss >> std::quoted(keyFileName);
- ss.str("");
- if (keyFileName.empty()) {
- std::cerr << "Invalid client key filename" << std::endl;
- return;
- }
- load_string_file(keyFileName, m_clientKey);
- if (m_clientKey.empty()) {
- std::cerr << "Invalid client key file" << std::endl;
- return;
- }
-
- std::string certFileName = settings.get("certificate", DEFAULT_CLIENT_CERT_FILE);
- std::stringstream().swap(ss);
- ss << certFileName;
- ss >> std::quoted(certFileName);
- if (certFileName.empty()) {
- std::cerr << "Invalid client certificate filename" << std::endl;
- return;
- }
- load_string_file(certFileName, m_clientCert);
- if (m_clientCert.empty()) {
- std::cerr << "Invalid client certificate file" << std::endl;
- return;
- }
-
std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE);
std::stringstream().swap(ss);
ss << caCertFileName;
@@ -131,12 +98,14 @@ VisConfig::VisConfig(const std::string &appname) :
std::cerr << "Invalid CA certificate filename" << std::endl;
return;
}
- load_string_file(caCertFileName, m_caCert);
+ readFile(caCertFileName, m_caCert);
if (m_caCert.empty()) {
std::cerr << "Invalid CA certificate file" << std::endl;
return;
}
+ m_tlsServerName = settings.get("tls-server-name", "");
+
std::string authTokenFileName = settings.get("authorization", "");
std::stringstream().swap(ss);
ss << authTokenFileName;
@@ -145,7 +114,7 @@ VisConfig::VisConfig(const std::string &appname) :
std::cerr << "Invalid authorization token filename" << std::endl;
return;
}
- load_string_file(authTokenFileName, m_authToken);
+ readFile(authTokenFileName, m_authToken);
if (m_authToken.empty()) {
std::cerr << "Invalid authorization token file" << std::endl;
return;
@@ -165,3 +134,14 @@ VisConfig::VisConfig(const std::string &appname) :
m_valid = true;
}
+
+// Private
+
+void KuksaConfig::readFile(const std::string &filename, std::string &data)
+{
+ try {
+ load_string_file(filename, data);
+ } catch (const std::exception &e) {
+ data.clear();
+ }
+}
diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h
new file mode 100644
index 0000000..e70385f
--- /dev/null
+++ b/src/KuksaConfig.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _KUKSA_CONFIG_H
+#define _KUKSA_CONFIG_H
+
+#include <string>
+
+class KuksaConfig
+{
+public:
+ explicit KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken);
+ explicit KuksaConfig(const std::string &appname);
+ ~KuksaConfig() {};
+
+ std::string hostname() { return m_hostname; };
+ unsigned port() { return m_port; };
+ std::string caCert() { return m_caCert; };
+ std::string tlsServerName() { return m_tlsServerName; };
+ std::string authToken() { return m_authToken; };
+ bool valid() { return m_valid; };
+ unsigned verbose() { return m_verbose; };
+
+private:
+ std::string m_hostname;
+ unsigned m_port;
+ std::string m_caCert;
+ std::string m_tlsServerName;
+ std::string m_authToken;
+ unsigned m_verbose;
+ bool m_valid;
+
+ void readFile(const std::string &filename, std::string &data);
+};
+
+#endif // _KUKSA_CONFIG_H
diff --git a/src/hvac-service.cpp b/src/hvac-service.cpp
deleted file mode 100644
index ee05806..0000000
--- a/src/hvac-service.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#include "hvac-service.hpp"
-#include <iostream>
-#include <algorithm>
-
-
-HvacService::HvacService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) :
- VisSession(config, ioc, ctx),
- m_can_helper(),
- m_led_helper()
-{
-}
-
-void HvacService::handle_authorized_response(void)
-{
- subscribe("Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature");
- subscribe("Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed");
- subscribe("Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature");
- subscribe("Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed");
-}
-
-void HvacService::handle_get_response(std::string &path, std::string &value, std::string &timestamp)
-{
- // Placeholder since no gets are performed ATM
-}
-
-void HvacService::handle_notification(std::string &path, std::string &value, std::string &timestamp)
-{
- if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature") {
- try {
- int temp = std::stoi(value);
- if (temp >= 0 && temp < 256)
- set_left_temperature(temp);
- }
- catch (std::exception ex) {
- // ignore bad value
- }
- } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature") {
- try {
- int temp = std::stoi(value);
- if (temp >= 0 && temp < 256)
- set_right_temperature(temp);
- }
- catch (std::exception ex) {
- // ignore bad value
- }
- } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed") {
- try {
- int speed = std::stoi(value);
- if (speed >= 0 && speed < 256)
- set_fan_speed(speed);
- }
- catch (std::exception ex) {
- // ignore bad value
- }
- } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed") {
- try {
- int speed = std::stoi(value);
- if (speed >= 0 && speed < 256)
- set_fan_speed(speed);
- }
- catch (std::exception ex) {
- // ignore bad value
- }
- }
- // else ignore
-}
-
-void HvacService::set_left_temperature(uint8_t temp)
-{
- m_can_helper.set_left_temperature(temp);
- m_led_helper.set_left_temperature(temp);
-}
-
-void HvacService::set_right_temperature(uint8_t temp)
-{
- m_can_helper.set_right_temperature(temp);
- m_led_helper.set_right_temperature(temp);
-}
-
-void HvacService::set_fan_speed(uint8_t speed)
-{
- m_can_helper.set_fan_speed(speed);
-}
diff --git a/src/hvac-service.hpp b/src/hvac-service.hpp
deleted file mode 100644
index 27bfe1a..0000000
--- a/src/hvac-service.hpp
+++ /dev/null
@@ -1,33 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _HVAC_SERVICE_HPP
-#define _HVAC_SERVICE_HPP
-
-#include "vis-session.hpp"
-#include "hvac-can-helper.hpp"
-#include "hvac-led-helper.hpp"
-
-class HvacService : public VisSession
-{
-public:
- HvacService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx);
-
-protected:
- virtual void handle_authorized_response(void) override;
-
- virtual void handle_get_response(std::string &path, std::string &value, std::string &timestamp) override;
-
- virtual void handle_notification(std::string &path, std::string &value, std::string &timestamp) override;
-
-private:
- HvacCanHelper m_can_helper;
- HvacLedHelper m_led_helper;
-
- void set_left_temperature(uint8_t temp);
-
- void set_right_temperature(uint8_t temp);
-
- void set_fan_speed(uint8_t temp);
-};
-
-#endif // _HVAC_SERVICE_HPP
diff --git a/src/main.cpp b/src/main.cpp
index 6bb165f..2cce0e7 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,34 +1,52 @@
-// SPDX-License-Identifier: Apache-2.0
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
-#include <iostream>
-#include <iomanip>
-#include <boost/asio/signal_set.hpp>
-#include <boost/bind.hpp>
-#include "hvac-service.hpp"
+#include <glib.h>
+#include <glib-unix.h>
+#include <systemd/sd-daemon.h>
-using work_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
+#include "HvacService.h"
+
+static gboolean quit_cb(gpointer user_data)
+{
+ GMainLoop *loop = (GMainLoop*) user_data;
+
+ g_info("Quitting...");
+
+ if (loop)
+ g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop);
+ else
+ exit(0);
+
+ return G_SOURCE_REMOVE;
+}
int main(int argc, char** argv)
{
- // The io_context is required for all I/O
- net::io_context ioc;
+ GMainLoop *loop = NULL;
+
+ loop = g_main_loop_new(NULL, FALSE);
+ if (!loop) {
+ std::cerr << "Could not create GLib event loop" << std::endl;
+ exit(1);
+ }
+
+ KuksaConfig config("agl-service-hvac");
- // Register to stop I/O context on SIGINT and SIGTERM
- net::signal_set signals(ioc, SIGINT, SIGTERM);
- signals.async_wait(boost::bind(&net::io_context::stop, &ioc));
+ g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop);
+ g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop);
- // The SSL context is required, and holds certificates
- ssl::context ctx{ssl::context::tlsv12_client};
+ HvacService service(config, loop);
- // Launch the asynchronous operation
- VisConfig config("agl-service-hvac");
- std::make_shared<HvacService>(config, ioc, ctx)->run();
+ sd_notify(0, "READY=1");
- // Ensure I/O context continues running even if there's no work
- work_guard_type work_guard(ioc.get_executor());
+ g_main_loop_run(loop);
- // Run the I/O context
- ioc.run();
+ // Clean up
+ g_main_loop_unref(loop);
return 0;
}
diff --git a/src/meson.build b/src/meson.build
index 149ec99..40aaec8 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -1,19 +1,60 @@
boost_dep = dependency('boost',
version : '>=1.72',
modules : [ 'thread', 'filesystem', 'program_options', 'log', 'system' ])
-openssl_dep = dependency('openssl')
-thread_dep = dependency('threads')
-cxx = meson.get_compiler('cpp')
-src = [ 'vis-config.cpp',
- 'vis-session.cpp',
- 'hvac-service.cpp',
- 'hvac-can-helper.cpp',
- 'hvac-led-helper.cpp',
- 'main.cpp'
+cpp = meson.get_compiler('cpp')
+grpcpp_reflection_dep = cpp.find_library('grpc++_reflection')
+
+service_dep = [
+ boost_dep,
+ dependency('glib-2.0'),
+ dependency('openssl'),
+ dependency('threads'),
+ dependency('libsystemd'),
+ dependency('protobuf'),
+ dependency('grpc'),
+ dependency('grpc++'),
+ grpcpp_reflection_dep
]
+
+protoc = find_program('protoc')
+grpc_cpp = find_program('grpc_cpp_plugin')
+
+protos_base_dir = get_option('protos')
+protos_dir = protos_base_dir / 'kuksa/val/v1'
+protoc_gen = generator(protoc, \
+ output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'],
+ arguments : ['-I=' + protos_base_dir,
+ '--cpp_out=@BUILD_DIR@',
+ '@INPUT@'])
+generated_protoc_sources = [ \
+ protoc_gen.process(protos_dir / 'types.proto', preserve_path_from : protos_base_dir),
+ protoc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir),
+]
+
+grpc_gen = generator(protoc, \
+ output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'],
+ arguments : ['-I=' + protos_base_dir,
+ '--grpc_out=@BUILD_DIR@',
+ '--plugin=protoc-gen-grpc=' + grpc_cpp.path(),
+ '@INPUT@'])
+generated_grpc_sources = [ \
+ grpc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir),
+]
+
+src = [
+ 'KuksaConfig.cpp',
+ 'KuksaClient.cpp',
+ 'HvacService.cpp',
+ 'HvacCanHelper.cpp',
+ 'HvacLedHelper.cpp',
+ 'main.cpp',
+ generated_protoc_sources,
+ generated_grpc_sources,
+]
+
executable('agl-service-hvac',
src,
- dependencies: [boost_dep, openssl_dep, thread_dep, systemd_dep],
+ dependencies: service_dep,
install: true,
install_dir : get_option('sbindir'))
diff --git a/src/vis-config.hpp b/src/vis-config.hpp
deleted file mode 100644
index b0f72f9..0000000
--- a/src/vis-config.hpp
+++ /dev/null
@@ -1,43 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _VIS_CONFIG_HPP
-#define _VIS_CONFIG_HPP
-
-#include <string>
-
-class VisConfig
-{
-public:
- explicit VisConfig(const std::string &hostname,
- const unsigned port,
- const std::string &clientKey,
- const std::string &clientCert,
- const std::string &caCert,
- const std::string &authToken,
- bool verifyPeer = true);
- explicit VisConfig(const std::string &appname);
- ~VisConfig() {};
-
- std::string hostname() { return m_hostname; };
- unsigned port() { return m_port; };
- std::string clientKey() { return m_clientKey; };
- std::string clientCert() { return m_clientCert; };
- std::string caCert() { return m_caCert; };
- std::string authToken() { return m_authToken; };
- bool verifyPeer() { return m_verifyPeer; };
- bool valid() { return m_valid; };
- unsigned verbose() { return m_verbose; };
-
-private:
- std::string m_hostname;
- unsigned m_port;
- std::string m_clientKey;
- std::string m_clientCert;
- std::string m_caCert;
- std::string m_authToken;
- bool m_verifyPeer;
- unsigned m_verbose;
- bool m_valid;
-};
-
-#endif // _VIS_CONFIG_HPP
diff --git a/src/vis-session.cpp b/src/vis-session.cpp
deleted file mode 100644
index 53a554c..0000000
--- a/src/vis-session.cpp
+++ /dev/null
@@ -1,380 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#include "vis-session.hpp"
-#include <iostream>
-#include <sstream>
-#include <thread>
-
-
-// Logging helper
-static void log_error(beast::error_code error, char const* what)
-{
- std::cerr << what << " error: " << error.message() << std::endl;
-}
-
-
-// Resolver and socket require an io_context
-VisSession::VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) :
- m_config(config),
- m_resolver(net::make_strand(ioc)),
- m_ws(net::make_strand(ioc), ctx)
-{
-}
-
-// Start the asynchronous operation
-void VisSession::run()
-{
- if (!m_config.valid()) {
- return;
- }
-
- // Start by resolving hostname
- m_resolver.async_resolve(m_config.hostname(),
- std::to_string(m_config.port()),
- beast::bind_front_handler(&VisSession::on_resolve,
- shared_from_this()));
-}
-
-void VisSession::on_resolve(beast::error_code error,
- tcp::resolver::results_type results)
-{
- if(error) {
- log_error(error, "resolve");
- return;
- }
-
- // Set a timeout on the connect operation
- beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30));
-
- // Connect to resolved address
- if (m_config.verbose())
- std::cout << "Connecting" << std::endl;
- m_results = results;
- connect();
-}
-
-void VisSession::connect()
-{
- beast::get_lowest_layer(m_ws).async_connect(m_results,
- beast::bind_front_handler(&VisSession::on_connect,
- shared_from_this()));
-}
-
-void VisSession::on_connect(beast::error_code error,
- tcp::resolver::results_type::endpoint_type endpoint)
-{
- if(error) {
- // The server can take a while to be ready to accept connections,
- // so keep retrying until we hit the timeout.
- if (error == net::error::timed_out) {
- log_error(error, "connect");
- return;
- }
-
- // Delay 500 ms before retrying
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
-
- if (m_config.verbose())
- std::cout << "Connecting" << std::endl;
-
- connect();
- return;
- }
-
- if (m_config.verbose())
- std::cout << "Connected" << std::endl;
-
- // Set handshake timeout
- beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30));
-
- // Set SNI Hostname (many hosts need this to handshake successfully)
- if(!SSL_set_tlsext_host_name(m_ws.next_layer().native_handle(),
- m_config.hostname().c_str()))
- {
- error = beast::error_code(static_cast<int>(::ERR_get_error()),
- net::error::get_ssl_category());
- log_error(error, "connect");
- return;
- }
-
- // Update the hostname. This will provide the value of the
- // Host HTTP header during the WebSocket handshake.
- // See https://tools.ietf.org/html/rfc7230#section-5.4
- m_hostname = m_config.hostname() + ':' + std::to_string(endpoint.port());
-
- if (m_config.verbose())
- std::cout << "Negotiating SSL handshake" << std::endl;
-
- // Perform the SSL handshake
- m_ws.next_layer().async_handshake(ssl::stream_base::client,
- beast::bind_front_handler(&VisSession::on_ssl_handshake,
- shared_from_this()));
-}
-
-void VisSession::on_ssl_handshake(beast::error_code error)
-{
- if(error) {
- log_error(error, "SSL handshake");
- return;
- }
-
- // Turn off the timeout on the tcp_stream, because
- // the websocket stream has its own timeout system.
- beast::get_lowest_layer(m_ws).expires_never();
-
- // NOTE: Explicitly not setting websocket stream timeout here,
- // as the client is long-running.
-
- if (m_config.verbose())
- std::cout << "Negotiating WSS handshake" << std::endl;
-
- // Perform handshake
- m_ws.async_handshake(m_hostname,
- "/",
- beast::bind_front_handler(&VisSession::on_handshake,
- shared_from_this()));
-}
-
-void VisSession::on_handshake(beast::error_code error)
-{
- if(error) {
- log_error(error, "WSS handshake");
- return;
- }
-
- if (m_config.verbose())
- std::cout << "Authorizing" << std::endl;
-
- // Authorize
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"]= "authorize";
- req["tokens"] = m_config.authToken();
-
- m_ws.async_write(net::buffer(req.dump(4)),
- beast::bind_front_handler(&VisSession::on_authorize,
- shared_from_this()));
-}
-
-void VisSession::on_authorize(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "authorize");
- return;
- }
-
- // Read response
- m_ws.async_read(m_buffer,
- beast::bind_front_handler(&VisSession::on_read,
- shared_from_this()));
-}
-
-// NOTE: Placeholder for now
-void VisSession::on_write(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "write");
- return;
- }
-
- // Do nothing...
-}
-
-void VisSession::on_read(beast::error_code error, std::size_t bytes_transferred)
-{
- boost::ignore_unused(bytes_transferred);
-
- if(error) {
- log_error(error, "read");
- return;
- }
-
- // Handle message
- std::string s = beast::buffers_to_string(m_buffer.data());
- json response = json::parse(s, nullptr, false);
- if (!response.is_discarded()) {
- handle_message(response);
- } else {
- std::cerr << "json::parse failed? got " << s << std::endl;
- }
- m_buffer.consume(m_buffer.size());
-
- // Read next message
- m_ws.async_read(m_buffer,
- beast::bind_front_handler(&VisSession::on_read,
- shared_from_this()));
-}
-
-void VisSession::get(const std::string &path)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "get";
- req["path"] = path;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-void VisSession::set(const std::string &path, const std::string &value)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "set";
- req["path"] = path;
- req["value"] = value;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-void VisSession::subscribe(const std::string &path)
-{
- if (!m_config.valid()) {
- return;
- }
-
- json req;
- req["requestId"] = std::to_string(m_requestid++);
- req["action"] = "subscribe";
- req["path"] = path;
- req["tokens"] = m_config.authToken();
-
- m_ws.write(net::buffer(req.dump(4)));
-}
-
-bool VisSession::parseData(const json &message, std::string &path, std::string &value, std::string &timestamp)
-{
- if (message.contains("error")) {
- std::string error = message["error"];
- return false;
- }
-
- if (!(message.contains("data") && message["data"].is_object())) {
- std::cerr << "Malformed message (data missing)" << std::endl;
- return false;
- }
- auto data = message["data"];
- if (!(data.contains("path") && data["path"].is_string())) {
- std::cerr << "Malformed message (path missing)" << std::endl;
- return false;
- }
- path = data["path"];
- // Convert '/' to '.' in paths to ensure consistency for clients
- std::replace(path.begin(), path.end(), '/', '.');
-
- if (!(data.contains("dp") && data["dp"].is_object())) {
- std::cerr << "Malformed message (datapoint missing)" << std::endl;
- return false;
- }
- auto dp = data["dp"];
- if (!dp.contains("value")) {
- std::cerr << "Malformed message (value missing)" << std::endl;
- return false;
- } else if (dp["value"].is_string()) {
- value = dp["value"];
- } else if (dp["value"].is_number_float()) {
- double num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_number_unsigned()) {
- unsigned int num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_number_integer()) {
- int num = dp["value"];
- value = std::to_string(num);
- } else if (dp["value"].is_boolean()) {
- value = dp["value"] ? "true" : "false";
- } else {
- std::cerr << "Malformed message (unsupported value type)" << std::endl;
- return false;
- }
-
- if (!(dp.contains("ts") && dp["ts"].is_string())) {
- std::cerr << "Malformed message (timestamp missing)" << std::endl;
- return false;
- }
- timestamp = dp["ts"];
-
- return true;
-}
-
-void VisSession::handle_message(const json &message)
-{
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: enter, message = " << to_string(message) << std::endl;
-
- if (!message.contains("action")) {
- std::cerr << "Received unknown message (no action), discarding" << std::endl;
- return;
- }
-
- std::string action = message["action"];
- if (action == "authorize") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS authorization failed: " << error << std::endl;
- } else {
- if (m_config.verbose() > 1)
- std::cout << "authorized" << std::endl;
-
- handle_authorized_response();
- }
- } else if (action == "subscribe") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS subscription failed: " << error << std::endl;
- }
- } else if (action == "get") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS get failed: " << error << std::endl;
- } else {
- std::string path, value, ts;
- if (parseData(message, path, value, ts)) {
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: got response " << path << " = " << value << std::endl;
-
- handle_get_response(path, value, ts);
- }
- }
- } else if (action == "set") {
- if (message.contains("error")) {
- std::string error = "unknown";
- if (message["error"].is_object() && message["error"].contains("message"))
- error = message["error"]["message"];
- std::cerr << "VIS set failed: " << error;
- }
- } else if (action == "subscription") {
- std::string path, value, ts;
- if (parseData(message, path, value, ts)) {
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: got notification " << path << " = " << value << std::endl;
-
- handle_notification(path, value, ts);
- }
- } else {
- std::cerr << "unhandled VIS response of type: " << action;
- }
-
- if (m_config.verbose() > 1)
- std::cout << "VisSession::handle_message: exit" << std::endl;
-}
-
diff --git a/src/vis-session.hpp b/src/vis-session.hpp
deleted file mode 100644
index 8c7b0d9..0000000
--- a/src/vis-session.hpp
+++ /dev/null
@@ -1,78 +0,0 @@
-// SPDX-License-Identifier: Apache-2.0
-
-#ifndef _VIS_SESSION_HPP
-#define _VIS_SESSION_HPP
-
-#include "vis-config.hpp"
-#include <atomic>
-#include <string>
-#include <boost/beast/core.hpp>
-#include <boost/beast/ssl.hpp>
-#include <boost/beast/websocket.hpp>
-#include <boost/beast/websocket/ssl.hpp>
-#include <boost/asio/strand.hpp>
-#include <nlohmann/json.hpp>
-
-namespace beast = boost::beast;
-namespace websocket = beast::websocket;
-namespace net = boost::asio;
-namespace ssl = boost::asio::ssl;
-using tcp = boost::asio::ip::tcp;
-using json = nlohmann::json;
-
-
-class VisSession : public std::enable_shared_from_this<VisSession>
-{
- //net::io_context m_ioc;
- tcp::resolver m_resolver;
- tcp::resolver::results_type m_results;
- std::string m_hostname;
- websocket::stream<beast::ssl_stream<beast::tcp_stream>> m_ws;
- beast::flat_buffer m_buffer;
-
-public:
- // Resolver and socket require an io_context
- explicit VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx);
-
- // Start the asynchronous operation
- void run();
-
-protected:
- VisConfig m_config;
- std::atomic_uint m_requestid;
-
- void on_resolve(beast::error_code error, tcp::resolver::results_type results);
-
- void connect();
-
- void on_connect(beast::error_code error, tcp::resolver::results_type::endpoint_type endpoint);
-
- void on_ssl_handshake(beast::error_code error);
-
- void on_handshake(beast::error_code error);
-
- void on_authorize(beast::error_code error, std::size_t bytes_transferred);
-
- void on_write(beast::error_code error, std::size_t bytes_transferred);
-
- void on_read(beast::error_code error, std::size_t bytes_transferred);
-
- void get(const std::string &path);
-
- void set(const std::string &path, const std::string &value);
-
- void subscribe(const std::string &path);
-
- void handle_message(const json &message);
-
- bool parseData(const json &message, std::string &path, std::string &value, std::string &timestamp);
-
- virtual void handle_authorized_response(void) = 0;
-
- virtual void handle_get_response(std::string &path, std::string &value, std::string &timestamp) = 0;
-
- virtual void handle_notification(std::string &path, std::string &value, std::string &timestamp) = 0;
-
-};
-
-#endif // _VIS_SESSION_HPP