diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/HvacCanHelper.cpp (renamed from src/hvac-can-helper.cpp) | 8 | ||||
-rw-r--r-- | src/HvacCanHelper.h (renamed from src/hvac-can-helper.hpp) | 12 | ||||
-rw-r--r-- | src/HvacLedHelper.cpp (renamed from src/hvac-led-helper.cpp) | 8 | ||||
-rw-r--r-- | src/HvacLedHelper.h (renamed from src/hvac-led-helper.hpp) | 12 | ||||
-rw-r--r-- | src/HvacService.cpp | 223 | ||||
-rw-r--r-- | src/HvacService.h | 65 | ||||
-rw-r--r-- | src/KuksaClient.cpp | 373 | ||||
-rw-r--r-- | src/KuksaClient.h | 79 | ||||
-rw-r--r-- | src/KuksaConfig.cpp (renamed from src/vis-config.cpp) | 96 | ||||
-rw-r--r-- | src/KuksaConfig.h | 43 | ||||
-rw-r--r-- | src/hvac-service.cpp | 85 | ||||
-rw-r--r-- | src/hvac-service.hpp | 33 | ||||
-rw-r--r-- | src/main.cpp | 60 | ||||
-rw-r--r-- | src/meson.build | 61 | ||||
-rw-r--r-- | src/vis-config.hpp | 43 | ||||
-rw-r--r-- | src/vis-session.cpp | 380 | ||||
-rw-r--r-- | src/vis-session.hpp | 78 |
17 files changed, 939 insertions, 720 deletions
diff --git a/src/hvac-can-helper.cpp b/src/HvacCanHelper.cpp index 634f4b0..0c0f937 100644 --- a/src/hvac-can-helper.cpp +++ b/src/HvacCanHelper.cpp @@ -1,6 +1,10 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include "hvac-can-helper.hpp" +#include "HvacCanHelper.h" #include <iostream> #include <iomanip> #include <sstream> diff --git a/src/hvac-can-helper.hpp b/src/HvacCanHelper.h index 4175e6b..0787834 100644 --- a/src/hvac-can-helper.hpp +++ b/src/HvacCanHelper.h @@ -1,7 +1,11 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#ifndef _HVAC_CAN_HELPER_HPP -#define _HVAC_CAN_HELPER_HPP +#ifndef _HVAC_CAN_HELPER_H +#define _HVAC_CAN_HELPER_H #include <cstdint> #include <string> @@ -51,4 +55,4 @@ private: uint8_t m_fan_speed; }; -#endif // _HVAC_CAN_HELPER_HPP +#endif // _HVAC_CAN_HELPER_H diff --git a/src/hvac-led-helper.cpp b/src/HvacLedHelper.cpp index 3c86c81..54d4b2f 100644 --- a/src/hvac-led-helper.cpp +++ b/src/HvacLedHelper.cpp @@ -1,6 +1,10 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include "hvac-led-helper.hpp" +#include "HvacLedHelper.h" #include <iostream> #include <iomanip> #include <sstream> diff --git a/src/hvac-led-helper.hpp b/src/HvacLedHelper.h index 6f73aac..be94393 100644 --- a/src/hvac-led-helper.hpp +++ b/src/HvacLedHelper.h @@ -1,7 +1,11 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#ifndef _HVAC_LED_HELPER_HPP -#define _HVAC_LED_HELPER_HPP +#ifndef _HVAC_LED_HELPER_H +#define _HVAC_LED_HELPER_H #include <cstdint> #include <string> @@ -32,4 +36,4 @@ private: uint8_t m_temp_right; }; -#endif // _HVAC_LED_HELPER_HPP +#endif // _HVAC_LED_HELPER_H diff --git a/src/HvacService.cpp b/src/HvacService.cpp new file mode 100644 index 0000000..b53c5d8 --- /dev/null +++ b/src/HvacService.cpp @@ -0,0 +1,223 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "HvacService.h" +#include <string> +#include <sstream> +#include <iostream> +#include <algorithm> + + +HvacService::HvacService(const KuksaConfig &config, GMainLoop *loop) : + m_loop(loop), + m_config(config), + m_can_helper(), + m_led_helper() +{ + // Create gRPC channel + std::string host = m_config.hostname(); + host += ":"; + std::stringstream ss; + ss << m_config.port(); + host += ss.str(); + + std::shared_ptr<grpc::Channel> channel; + if (!m_config.caCert().empty()) { + grpc::SslCredentialsOptions options; + options.pem_root_certs = m_config.caCert(); + if (!m_config.tlsServerName().empty()) { + grpc::ChannelArguments args; + auto target = m_config.tlsServerName(); + std::cout << "Overriding TLS target name with " << target << std::endl; + args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target); + channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args); + } else { + channel = grpc::CreateChannel(host, grpc::SslCredentials(options)); + } + } else { + channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials()); + } + + // Wait for the channel to be ready + std::cout << "Waiting for Databroker gRPC channel" << std::endl; + while (!channel->WaitForConnected(std::chrono::system_clock::now() + + std::chrono::milliseconds(500))) ; + std::cout << "Databroker gRPC channel ready" << std::endl; + + m_broker = new KuksaClient(channel, m_config); + if (m_broker) { + // Listen to actuator target updates + std::map<std::string, bool> signals; + signals["Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature"] = true; + signals["Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed"] = true; + signals["Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature"] = true; + signals["Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"] = true; + m_broker->subscribe(signals, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); + } +} + +HvacService::~HvacService() +{ + delete m_broker; +} + +// Private + +void HvacService::HandleSignalChange(const std::string &path, const Datapoint &dp) +{ + if (m_config.verbose() > 1) + std::cout << "HvacService::HandleSignalChange: Value received for " << path << std::endl; + + if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature") { + if (dp.has_int32()) { + int temp = dp.int32(); + if (temp >= 0 && temp < 256) + set_left_temperature(temp); + } + } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature") { + if (dp.has_int32()) { + int temp = dp.int32(); + if (temp >= 0 && temp < 256) + set_right_temperature(temp); + } + } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed") { + if (dp.has_uint32()) { + int speed = dp.uint32(); + if (speed >= 0 && speed <= 100) + set_left_fan_speed(speed); + } + } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed") { + if (dp.has_uint32()) { + int speed = dp.uint32(); + if (speed >= 0 && speed <= 100) + set_right_fan_speed(speed); + } + } + // else ignore +} + +void HvacService::HandleSignalSetError(const std::string &path, const Error &error) +{ + std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl; +} + +void HvacService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status) +{ + if (m_config.verbose()) + std::cout << "Subscribe status = " << status.error_code() << + " (" << status.error_message() << ")" << std::endl; + + if (status.error_code() == grpc::CANCELLED) { + if (m_config.verbose()) + std::cerr << "Subscribe canceled, assuming shutdown" << std::endl; + return; + } + + // Queue up a resubcribe via the GLib event loop callback + struct resubscribe_data *data = new (struct resubscribe_data); + if (!data) { + std::cerr << "Could not create resubcribe_data" << std::endl; + exit(1); + } + data->self = this; + // Need to copy request since the one we have been handed is from the + // finished subscribe and will be going away. + data->request = new SubscribeRequest(*request); + if (!data->request) { + std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl; + exit(1); + } + + // NOTE: Waiting 100 milliseconds for now; it is possible that some + // randomization and/or back-off may need to be added if many + // subscribes are active, or switching to some other resubscribe + // scheme altogether (e.g. post subscribes to a thread that waits + // for the channel to become connected again). + g_timeout_add_full(G_PRIORITY_DEFAULT, + 100, + resubscribe_cb, + data, + NULL); +} + +void HvacService::Resubscribe(const SubscribeRequest *request) +{ + if (!(m_broker && request)) + return; + + m_broker->subscribe(request, + [this](const std::string &path, const Datapoint &dp) { + HandleSignalChange(path, dp); + }, + [this](const SubscribeRequest *request, const Status &s) { + HandleSubscribeDone(request, s); + }); +} + +// NOTE: The following should perhaps be scheduling work via the GLib +// main loop to avoid potentially blocking threads from the gRPC +// pool. + +void HvacService::set_left_temperature(uint8_t temp) +{ + m_can_helper.set_left_temperature(temp); + m_led_helper.set_left_temperature(temp); + + // Push out new value + m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature", + (int) temp, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); +} + +void HvacService::set_right_temperature(uint8_t temp) +{ + m_can_helper.set_right_temperature(temp); + m_led_helper.set_right_temperature(temp); + + // Push out new value + m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature", + (int) temp, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); +} + +void HvacService::set_left_fan_speed(uint8_t speed) +{ + set_fan_speed(speed); + + // Push out new value + m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed", + speed, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); +} + +void HvacService::set_right_fan_speed(uint8_t speed) +{ + set_fan_speed(speed); + + // Push out new value + m_broker->set("Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed", + speed, + [this](const std::string &path, const Error &error) { + HandleSignalSetError(path, error); + }); +} + +void HvacService::set_fan_speed(uint8_t speed) +{ + m_can_helper.set_fan_speed(speed); +} diff --git a/src/HvacService.h b/src/HvacService.h new file mode 100644 index 0000000..756f6f3 --- /dev/null +++ b/src/HvacService.h @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _HVAC_SERVICE_H +#define _HVAC_SERVICE_H + +#include <glib.h> + +#include "KuksaConfig.h" +#include "KuksaClient.h" +#include "HvacCanHelper.h" +#include "HvacLedHelper.h" + +class HvacService +{ +public: + HvacService(const KuksaConfig &config, GMainLoop *loop = NULL); + + ~HvacService(); + + // Callback for KuksaClient subscribe API reconnect + + static gboolean resubscribe_cb(gpointer data) { + struct resubscribe_data *d = (struct resubscribe_data*) data; + if (d && d->self) { + ((HvacService*) d->self)->Resubscribe(d->request); + } + return FALSE; + } + +private: + struct resubscribe_data { + HvacService *self; + const SubscribeRequest *request; + }; + + GMainLoop *m_loop; + KuksaConfig m_config; + KuksaClient *m_broker; + HvacCanHelper m_can_helper; + HvacLedHelper m_led_helper; + + void HandleSignalChange(const std::string &path, const Datapoint &dp); + + void HandleSignalSetError(const std::string &path, const Error &error); + + void HandleSubscribeDone(const SubscribeRequest *request, const Status &status); + + void Resubscribe(const SubscribeRequest *request); + + void set_left_temperature(uint8_t temp); + + void set_right_temperature(uint8_t temp); + + void set_left_fan_speed(uint8_t temp); + + void set_right_fan_speed(uint8_t temp); + + void set_fan_speed(uint8_t temp); +}; + +#endif // _HVAC_SERVICE_H diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp new file mode 100644 index 0000000..844ce0e --- /dev/null +++ b/src/KuksaClient.cpp @@ -0,0 +1,373 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <string> +#include <regex> +#include <iterator> +#include <mutex> + +#include "KuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) : + m_config(config) +{ + m_stub = VAL::NewStub(channel); +} + +void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleGetResponse(response, cb); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_string(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto entry = request->add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const std::map<std::string, bool> signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + for(auto it = signals.cbegin(); it != signals.cend(); ++it) { + auto entry = request->add_entries(); + entry->set_path(it->first); + entry->add_fields(Field::FIELD_PATH); + if (it->second) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + } + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + if (!(request && cb)) + return; + + class Reader : public grpc::ClientReadReactor<SubscribeResponse> { + public: + Reader(VAL::Stub *stub, + KuksaClient *client, + KuksaConfig &config, + const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb): + client_(client), + config_(config), + request_(request), + cb_(cb), + done_cb_(done_cb) { + std::string token = config_.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context_.AddMetadata(std::string("authorization"), token); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock<std::mutex> lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_, cb_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + std::cerr << "KuksaClient::subscribe::Reader done" << std::endl; + client_->handleSubscribeDone(request_, status_, done_cb_); + } + + // gRPC engine is done with us, safe to self-delete + delete request_; + delete this; + } + + private: + KuksaClient *client_; + KuksaConfig config_; + const SubscribeRequest *request_; + SubscribeResponseCallback cb_; + SubscribeDoneCallback done_cb_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; + }; + Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb); + if (!reader) + handleCriticalFailure("Could not create Subscribe reader"); +} + +// Private + +void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleSetResponse(response, cb); + delete response; + delete context; + }); +} + +void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb) +{ + if (!(response && response->entries_size() && cb)) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + cb(it->path(), dp); + } +} + +void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb) +{ + if (!(response && response->errors_size() && cb)) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + cb(it->path(), it->error()); + } + +} + +void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb) +{ + if (!(response && response->updates_size() && cb)) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + if (m_config.verbose()) + std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl; + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + cb(entry.path(), dp); + } +} + +void KuksaClient::handleSubscribeDone(const SubscribeRequest *request, + const Status &status, + SubscribeDoneCallback cb) +{ + if (cb) + cb(request, status); +} + +void KuksaClient::handleCriticalFailure(const std::string &error) +{ + if (error.size()) + std::cerr << error << std::endl; + exit(1); +} + diff --git a/src/KuksaClient.h b/src/KuksaClient.h new file mode 100644 index 0000000..fd9a9e0 --- /dev/null +++ b/src/KuksaClient.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KUKSA_CLIENT_H +#define KUKSA_CLIENT_H + +#include <string> +#include <map> +#include <grpcpp/grpcpp.h> +#include "kuksa/val/v1/val.grpc.pb.h" + +// Just pull in the whole namespace since Datapoint contains a lot of +// definitions that may potentially be needed. +using namespace kuksa::val::v1; + +using grpc::Status; + +#include "KuksaConfig.h" + +// API response callback types +typedef std::function<void(const std::string &path, const Datapoint &dp)> GetResponseCallback; +typedef std::function<void(const std::string &path, const Error &error)> SetResponseCallback; +typedef std::function<void(const std::string &path, const Datapoint &dp)> SubscribeResponseCallback; +typedef std::function<void(const SubscribeRequest *request, const Status &status)> SubscribeDoneCallback; + +// KUKSA.val databroker "VAL" gRPC API client class + +class KuksaClient +{ +public: + explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config); + + void get(const std::string &path, GetResponseCallback cb, const bool actuator = false); + + void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false); + void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false); + + void subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator = false, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const std::map<std::string, bool> signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + void subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb = nullptr); + +private: + KuksaConfig m_config; + std::shared_ptr<VAL::Stub> m_stub; + + void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator); + + void handleGetResponse(const GetResponse *response, GetResponseCallback cb); + + void handleSetResponse(const SetResponse *response, SetResponseCallback cb); + + void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb); + + void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb); + + void handleCriticalFailure(const std::string &error); + void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); }; +}; + +#endif // KUKSA_CLIENT_H diff --git a/src/vis-config.cpp b/src/KuksaConfig.cpp index a55e235..8fe09f6 100644 --- a/src/vis-config.cpp +++ b/src/KuksaConfig.cpp @@ -1,18 +1,21 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include "vis-config.hpp" #include <iostream> #include <iomanip> #include <sstream> +#include <exception> #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ini_parser.hpp> #include <boost/filesystem.hpp> +#include "KuksaConfig.h" namespace property_tree = boost::property_tree; namespace filesystem = boost::filesystem; -#define DEFAULT_CLIENT_KEY_FILE "/etc/kuksa-val/Client.key" -#define DEFAULT_CLIENT_CERT_FILE "/etc/kuksa-val/Client.pem" #define DEFAULT_CA_CERT_FILE "/etc/kuksa-val/CA.pem" inline @@ -21,32 +24,32 @@ void load_string_file(const filesystem::path& p, std::string& str) std::ifstream file; file.exceptions(std::ifstream::failbit | std::ifstream::badbit); file.open(p, std::ios_base::binary); - std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p)); - str.resize(sz, '\0'); - file.read(&str[0], sz); + if (file.good()) { + std::size_t sz = static_cast<std::size_t>(filesystem::file_size(p)); + str.resize(sz, '\0'); + file.read(&str[0], sz); + } else { + str.clear(); + } } -VisConfig::VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer) : +KuksaConfig::KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken) : m_hostname(hostname), m_port(port), - m_clientKey(clientKey), - m_clientCert(clientCert), m_caCert(caCert), + m_tlsServerName(tlsServerName), m_authToken(authToken), - m_verifyPeer(verifyPeer), m_verbose(0), m_valid(true) { // Potentially could do some certificate validation here... } -VisConfig::VisConfig(const std::string &appname) : +KuksaConfig::KuksaConfig(const std::string &appname) : m_valid(false) { std::string config("/etc/xdg/AGL/"); @@ -70,7 +73,7 @@ VisConfig::VisConfig(const std::string &appname) : return; } const property_tree::ptree &settings = - pt.get_child("vis-client", property_tree::ptree()); + pt.get_child("kuksa-client", property_tree::ptree()); m_hostname = settings.get("server", "localhost"); std::stringstream ss; @@ -81,48 +84,12 @@ VisConfig::VisConfig(const std::string &appname) : return; } - m_port = settings.get("port", 8090); + m_port = settings.get("port", 55555); if (m_port == 0) { std::cerr << "Invalid server port" << std::endl; return; } - // Default to disabling peer verification for now to be able - // to use the default upstream KUKSA.val certificates for - // testing. Wrangling server and CA certificate generation - // and management to be able to verify will require further - // investigation. - m_verifyPeer = settings.get("verify-server", false); - - std::string keyFileName = settings.get("key", DEFAULT_CLIENT_KEY_FILE); - std::stringstream().swap(ss); - ss << keyFileName; - ss >> std::quoted(keyFileName); - ss.str(""); - if (keyFileName.empty()) { - std::cerr << "Invalid client key filename" << std::endl; - return; - } - load_string_file(keyFileName, m_clientKey); - if (m_clientKey.empty()) { - std::cerr << "Invalid client key file" << std::endl; - return; - } - - std::string certFileName = settings.get("certificate", DEFAULT_CLIENT_CERT_FILE); - std::stringstream().swap(ss); - ss << certFileName; - ss >> std::quoted(certFileName); - if (certFileName.empty()) { - std::cerr << "Invalid client certificate filename" << std::endl; - return; - } - load_string_file(certFileName, m_clientCert); - if (m_clientCert.empty()) { - std::cerr << "Invalid client certificate file" << std::endl; - return; - } - std::string caCertFileName = settings.get("ca-certificate", DEFAULT_CA_CERT_FILE); std::stringstream().swap(ss); ss << caCertFileName; @@ -131,12 +98,14 @@ VisConfig::VisConfig(const std::string &appname) : std::cerr << "Invalid CA certificate filename" << std::endl; return; } - load_string_file(caCertFileName, m_caCert); + readFile(caCertFileName, m_caCert); if (m_caCert.empty()) { std::cerr << "Invalid CA certificate file" << std::endl; return; } + m_tlsServerName = settings.get("tls-server-name", ""); + std::string authTokenFileName = settings.get("authorization", ""); std::stringstream().swap(ss); ss << authTokenFileName; @@ -145,7 +114,7 @@ VisConfig::VisConfig(const std::string &appname) : std::cerr << "Invalid authorization token filename" << std::endl; return; } - load_string_file(authTokenFileName, m_authToken); + readFile(authTokenFileName, m_authToken); if (m_authToken.empty()) { std::cerr << "Invalid authorization token file" << std::endl; return; @@ -165,3 +134,14 @@ VisConfig::VisConfig(const std::string &appname) : m_valid = true; } + +// Private + +void KuksaConfig::readFile(const std::string &filename, std::string &data) +{ + try { + load_string_file(filename, data); + } catch (const std::exception &e) { + data.clear(); + } +} diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h new file mode 100644 index 0000000..e70385f --- /dev/null +++ b/src/KuksaConfig.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _KUKSA_CONFIG_H +#define _KUKSA_CONFIG_H + +#include <string> + +class KuksaConfig +{ +public: + explicit KuksaConfig(const std::string &hostname, + const unsigned port, + const std::string &caCert, + const std::string &tlsServerName, + const std::string &authToken); + explicit KuksaConfig(const std::string &appname); + ~KuksaConfig() {}; + + std::string hostname() { return m_hostname; }; + unsigned port() { return m_port; }; + std::string caCert() { return m_caCert; }; + std::string tlsServerName() { return m_tlsServerName; }; + std::string authToken() { return m_authToken; }; + bool valid() { return m_valid; }; + unsigned verbose() { return m_verbose; }; + +private: + std::string m_hostname; + unsigned m_port; + std::string m_caCert; + std::string m_tlsServerName; + std::string m_authToken; + unsigned m_verbose; + bool m_valid; + + void readFile(const std::string &filename, std::string &data); +}; + +#endif // _KUKSA_CONFIG_H diff --git a/src/hvac-service.cpp b/src/hvac-service.cpp deleted file mode 100644 index ee05806..0000000 --- a/src/hvac-service.cpp +++ /dev/null @@ -1,85 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "hvac-service.hpp" -#include <iostream> -#include <algorithm> - - -HvacService::HvacService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - VisSession(config, ioc, ctx), - m_can_helper(), - m_led_helper() -{ -} - -void HvacService::handle_authorized_response(void) -{ - subscribe("Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature"); - subscribe("Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed"); - subscribe("Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature"); - subscribe("Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"); -} - -void HvacService::handle_get_response(std::string &path, std::string &value, std::string ×tamp) -{ - // Placeholder since no gets are performed ATM -} - -void HvacService::handle_notification(std::string &path, std::string &value, std::string ×tamp) -{ - if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature") { - try { - int temp = std::stoi(value); - if (temp >= 0 && temp < 256) - set_left_temperature(temp); - } - catch (std::exception ex) { - // ignore bad value - } - } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature") { - try { - int temp = std::stoi(value); - if (temp >= 0 && temp < 256) - set_right_temperature(temp); - } - catch (std::exception ex) { - // ignore bad value - } - } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed") { - try { - int speed = std::stoi(value); - if (speed >= 0 && speed < 256) - set_fan_speed(speed); - } - catch (std::exception ex) { - // ignore bad value - } - } else if (path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed") { - try { - int speed = std::stoi(value); - if (speed >= 0 && speed < 256) - set_fan_speed(speed); - } - catch (std::exception ex) { - // ignore bad value - } - } - // else ignore -} - -void HvacService::set_left_temperature(uint8_t temp) -{ - m_can_helper.set_left_temperature(temp); - m_led_helper.set_left_temperature(temp); -} - -void HvacService::set_right_temperature(uint8_t temp) -{ - m_can_helper.set_right_temperature(temp); - m_led_helper.set_right_temperature(temp); -} - -void HvacService::set_fan_speed(uint8_t speed) -{ - m_can_helper.set_fan_speed(speed); -} diff --git a/src/hvac-service.hpp b/src/hvac-service.hpp deleted file mode 100644 index 27bfe1a..0000000 --- a/src/hvac-service.hpp +++ /dev/null @@ -1,33 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _HVAC_SERVICE_HPP -#define _HVAC_SERVICE_HPP - -#include "vis-session.hpp" -#include "hvac-can-helper.hpp" -#include "hvac-led-helper.hpp" - -class HvacService : public VisSession -{ -public: - HvacService(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - -protected: - virtual void handle_authorized_response(void) override; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) override; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) override; - -private: - HvacCanHelper m_can_helper; - HvacLedHelper m_led_helper; - - void set_left_temperature(uint8_t temp); - - void set_right_temperature(uint8_t temp); - - void set_fan_speed(uint8_t temp); -}; - -#endif // _HVAC_SERVICE_HPP diff --git a/src/main.cpp b/src/main.cpp index 6bb165f..2cce0e7 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,34 +1,52 @@ -// SPDX-License-Identifier: Apache-2.0 +/* + * Copyright (C) 2022,2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ -#include <iostream> -#include <iomanip> -#include <boost/asio/signal_set.hpp> -#include <boost/bind.hpp> -#include "hvac-service.hpp" +#include <glib.h> +#include <glib-unix.h> +#include <systemd/sd-daemon.h> -using work_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>; +#include "HvacService.h" + +static gboolean quit_cb(gpointer user_data) +{ + GMainLoop *loop = (GMainLoop*) user_data; + + g_info("Quitting..."); + + if (loop) + g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop); + else + exit(0); + + return G_SOURCE_REMOVE; +} int main(int argc, char** argv) { - // The io_context is required for all I/O - net::io_context ioc; + GMainLoop *loop = NULL; + + loop = g_main_loop_new(NULL, FALSE); + if (!loop) { + std::cerr << "Could not create GLib event loop" << std::endl; + exit(1); + } + + KuksaConfig config("agl-service-hvac"); - // Register to stop I/O context on SIGINT and SIGTERM - net::signal_set signals(ioc, SIGINT, SIGTERM); - signals.async_wait(boost::bind(&net::io_context::stop, &ioc)); + g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop); + g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop); - // The SSL context is required, and holds certificates - ssl::context ctx{ssl::context::tlsv12_client}; + HvacService service(config, loop); - // Launch the asynchronous operation - VisConfig config("agl-service-hvac"); - std::make_shared<HvacService>(config, ioc, ctx)->run(); + sd_notify(0, "READY=1"); - // Ensure I/O context continues running even if there's no work - work_guard_type work_guard(ioc.get_executor()); + g_main_loop_run(loop); - // Run the I/O context - ioc.run(); + // Clean up + g_main_loop_unref(loop); return 0; } diff --git a/src/meson.build b/src/meson.build index 149ec99..40aaec8 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,19 +1,60 @@ boost_dep = dependency('boost', version : '>=1.72', modules : [ 'thread', 'filesystem', 'program_options', 'log', 'system' ]) -openssl_dep = dependency('openssl') -thread_dep = dependency('threads') -cxx = meson.get_compiler('cpp') -src = [ 'vis-config.cpp', - 'vis-session.cpp', - 'hvac-service.cpp', - 'hvac-can-helper.cpp', - 'hvac-led-helper.cpp', - 'main.cpp' +cpp = meson.get_compiler('cpp') +grpcpp_reflection_dep = cpp.find_library('grpc++_reflection') + +service_dep = [ + boost_dep, + dependency('glib-2.0'), + dependency('openssl'), + dependency('threads'), + dependency('libsystemd'), + dependency('protobuf'), + dependency('grpc'), + dependency('grpc++'), + grpcpp_reflection_dep ] + +protoc = find_program('protoc') +grpc_cpp = find_program('grpc_cpp_plugin') + +protos_base_dir = get_option('protos') +protos_dir = protos_base_dir / 'kuksa/val/v1' +protoc_gen = generator(protoc, \ + output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--cpp_out=@BUILD_DIR@', + '@INPUT@']) +generated_protoc_sources = [ \ + protoc_gen.process(protos_dir / 'types.proto', preserve_path_from : protos_base_dir), + protoc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +grpc_gen = generator(protoc, \ + output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'], + arguments : ['-I=' + protos_base_dir, + '--grpc_out=@BUILD_DIR@', + '--plugin=protoc-gen-grpc=' + grpc_cpp.path(), + '@INPUT@']) +generated_grpc_sources = [ \ + grpc_gen.process(protos_dir / 'val.proto', preserve_path_from : protos_base_dir), +] + +src = [ + 'KuksaConfig.cpp', + 'KuksaClient.cpp', + 'HvacService.cpp', + 'HvacCanHelper.cpp', + 'HvacLedHelper.cpp', + 'main.cpp', + generated_protoc_sources, + generated_grpc_sources, +] + executable('agl-service-hvac', src, - dependencies: [boost_dep, openssl_dep, thread_dep, systemd_dep], + dependencies: service_dep, install: true, install_dir : get_option('sbindir')) diff --git a/src/vis-config.hpp b/src/vis-config.hpp deleted file mode 100644 index b0f72f9..0000000 --- a/src/vis-config.hpp +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_CONFIG_HPP -#define _VIS_CONFIG_HPP - -#include <string> - -class VisConfig -{ -public: - explicit VisConfig(const std::string &hostname, - const unsigned port, - const std::string &clientKey, - const std::string &clientCert, - const std::string &caCert, - const std::string &authToken, - bool verifyPeer = true); - explicit VisConfig(const std::string &appname); - ~VisConfig() {}; - - std::string hostname() { return m_hostname; }; - unsigned port() { return m_port; }; - std::string clientKey() { return m_clientKey; }; - std::string clientCert() { return m_clientCert; }; - std::string caCert() { return m_caCert; }; - std::string authToken() { return m_authToken; }; - bool verifyPeer() { return m_verifyPeer; }; - bool valid() { return m_valid; }; - unsigned verbose() { return m_verbose; }; - -private: - std::string m_hostname; - unsigned m_port; - std::string m_clientKey; - std::string m_clientCert; - std::string m_caCert; - std::string m_authToken; - bool m_verifyPeer; - unsigned m_verbose; - bool m_valid; -}; - -#endif // _VIS_CONFIG_HPP diff --git a/src/vis-session.cpp b/src/vis-session.cpp deleted file mode 100644 index 53a554c..0000000 --- a/src/vis-session.cpp +++ /dev/null @@ -1,380 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#include "vis-session.hpp" -#include <iostream> -#include <sstream> -#include <thread> - - -// Logging helper -static void log_error(beast::error_code error, char const* what) -{ - std::cerr << what << " error: " << error.message() << std::endl; -} - - -// Resolver and socket require an io_context -VisSession::VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx) : - m_config(config), - m_resolver(net::make_strand(ioc)), - m_ws(net::make_strand(ioc), ctx) -{ -} - -// Start the asynchronous operation -void VisSession::run() -{ - if (!m_config.valid()) { - return; - } - - // Start by resolving hostname - m_resolver.async_resolve(m_config.hostname(), - std::to_string(m_config.port()), - beast::bind_front_handler(&VisSession::on_resolve, - shared_from_this())); -} - -void VisSession::on_resolve(beast::error_code error, - tcp::resolver::results_type results) -{ - if(error) { - log_error(error, "resolve"); - return; - } - - // Set a timeout on the connect operation - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Connect to resolved address - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - m_results = results; - connect(); -} - -void VisSession::connect() -{ - beast::get_lowest_layer(m_ws).async_connect(m_results, - beast::bind_front_handler(&VisSession::on_connect, - shared_from_this())); -} - -void VisSession::on_connect(beast::error_code error, - tcp::resolver::results_type::endpoint_type endpoint) -{ - if(error) { - // The server can take a while to be ready to accept connections, - // so keep retrying until we hit the timeout. - if (error == net::error::timed_out) { - log_error(error, "connect"); - return; - } - - // Delay 500 ms before retrying - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - if (m_config.verbose()) - std::cout << "Connecting" << std::endl; - - connect(); - return; - } - - if (m_config.verbose()) - std::cout << "Connected" << std::endl; - - // Set handshake timeout - beast::get_lowest_layer(m_ws).expires_after(std::chrono::seconds(30)); - - // Set SNI Hostname (many hosts need this to handshake successfully) - if(!SSL_set_tlsext_host_name(m_ws.next_layer().native_handle(), - m_config.hostname().c_str())) - { - error = beast::error_code(static_cast<int>(::ERR_get_error()), - net::error::get_ssl_category()); - log_error(error, "connect"); - return; - } - - // Update the hostname. This will provide the value of the - // Host HTTP header during the WebSocket handshake. - // See https://tools.ietf.org/html/rfc7230#section-5.4 - m_hostname = m_config.hostname() + ':' + std::to_string(endpoint.port()); - - if (m_config.verbose()) - std::cout << "Negotiating SSL handshake" << std::endl; - - // Perform the SSL handshake - m_ws.next_layer().async_handshake(ssl::stream_base::client, - beast::bind_front_handler(&VisSession::on_ssl_handshake, - shared_from_this())); -} - -void VisSession::on_ssl_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "SSL handshake"); - return; - } - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(m_ws).expires_never(); - - // NOTE: Explicitly not setting websocket stream timeout here, - // as the client is long-running. - - if (m_config.verbose()) - std::cout << "Negotiating WSS handshake" << std::endl; - - // Perform handshake - m_ws.async_handshake(m_hostname, - "/", - beast::bind_front_handler(&VisSession::on_handshake, - shared_from_this())); -} - -void VisSession::on_handshake(beast::error_code error) -{ - if(error) { - log_error(error, "WSS handshake"); - return; - } - - if (m_config.verbose()) - std::cout << "Authorizing" << std::endl; - - // Authorize - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"]= "authorize"; - req["tokens"] = m_config.authToken(); - - m_ws.async_write(net::buffer(req.dump(4)), - beast::bind_front_handler(&VisSession::on_authorize, - shared_from_this())); -} - -void VisSession::on_authorize(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "authorize"); - return; - } - - // Read response - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -// NOTE: Placeholder for now -void VisSession::on_write(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "write"); - return; - } - - // Do nothing... -} - -void VisSession::on_read(beast::error_code error, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - if(error) { - log_error(error, "read"); - return; - } - - // Handle message - std::string s = beast::buffers_to_string(m_buffer.data()); - json response = json::parse(s, nullptr, false); - if (!response.is_discarded()) { - handle_message(response); - } else { - std::cerr << "json::parse failed? got " << s << std::endl; - } - m_buffer.consume(m_buffer.size()); - - // Read next message - m_ws.async_read(m_buffer, - beast::bind_front_handler(&VisSession::on_read, - shared_from_this())); -} - -void VisSession::get(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "get"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::set(const std::string &path, const std::string &value) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "set"; - req["path"] = path; - req["value"] = value; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -void VisSession::subscribe(const std::string &path) -{ - if (!m_config.valid()) { - return; - } - - json req; - req["requestId"] = std::to_string(m_requestid++); - req["action"] = "subscribe"; - req["path"] = path; - req["tokens"] = m_config.authToken(); - - m_ws.write(net::buffer(req.dump(4))); -} - -bool VisSession::parseData(const json &message, std::string &path, std::string &value, std::string ×tamp) -{ - if (message.contains("error")) { - std::string error = message["error"]; - return false; - } - - if (!(message.contains("data") && message["data"].is_object())) { - std::cerr << "Malformed message (data missing)" << std::endl; - return false; - } - auto data = message["data"]; - if (!(data.contains("path") && data["path"].is_string())) { - std::cerr << "Malformed message (path missing)" << std::endl; - return false; - } - path = data["path"]; - // Convert '/' to '.' in paths to ensure consistency for clients - std::replace(path.begin(), path.end(), '/', '.'); - - if (!(data.contains("dp") && data["dp"].is_object())) { - std::cerr << "Malformed message (datapoint missing)" << std::endl; - return false; - } - auto dp = data["dp"]; - if (!dp.contains("value")) { - std::cerr << "Malformed message (value missing)" << std::endl; - return false; - } else if (dp["value"].is_string()) { - value = dp["value"]; - } else if (dp["value"].is_number_float()) { - double num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_unsigned()) { - unsigned int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_number_integer()) { - int num = dp["value"]; - value = std::to_string(num); - } else if (dp["value"].is_boolean()) { - value = dp["value"] ? "true" : "false"; - } else { - std::cerr << "Malformed message (unsupported value type)" << std::endl; - return false; - } - - if (!(dp.contains("ts") && dp["ts"].is_string())) { - std::cerr << "Malformed message (timestamp missing)" << std::endl; - return false; - } - timestamp = dp["ts"]; - - return true; -} - -void VisSession::handle_message(const json &message) -{ - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: enter, message = " << to_string(message) << std::endl; - - if (!message.contains("action")) { - std::cerr << "Received unknown message (no action), discarding" << std::endl; - return; - } - - std::string action = message["action"]; - if (action == "authorize") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS authorization failed: " << error << std::endl; - } else { - if (m_config.verbose() > 1) - std::cout << "authorized" << std::endl; - - handle_authorized_response(); - } - } else if (action == "subscribe") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS subscription failed: " << error << std::endl; - } - } else if (action == "get") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS get failed: " << error << std::endl; - } else { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got response " << path << " = " << value << std::endl; - - handle_get_response(path, value, ts); - } - } - } else if (action == "set") { - if (message.contains("error")) { - std::string error = "unknown"; - if (message["error"].is_object() && message["error"].contains("message")) - error = message["error"]["message"]; - std::cerr << "VIS set failed: " << error; - } - } else if (action == "subscription") { - std::string path, value, ts; - if (parseData(message, path, value, ts)) { - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: got notification " << path << " = " << value << std::endl; - - handle_notification(path, value, ts); - } - } else { - std::cerr << "unhandled VIS response of type: " << action; - } - - if (m_config.verbose() > 1) - std::cout << "VisSession::handle_message: exit" << std::endl; -} - diff --git a/src/vis-session.hpp b/src/vis-session.hpp deleted file mode 100644 index 8c7b0d9..0000000 --- a/src/vis-session.hpp +++ /dev/null @@ -1,78 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -#ifndef _VIS_SESSION_HPP -#define _VIS_SESSION_HPP - -#include "vis-config.hpp" -#include <atomic> -#include <string> -#include <boost/beast/core.hpp> -#include <boost/beast/ssl.hpp> -#include <boost/beast/websocket.hpp> -#include <boost/beast/websocket/ssl.hpp> -#include <boost/asio/strand.hpp> -#include <nlohmann/json.hpp> - -namespace beast = boost::beast; -namespace websocket = beast::websocket; -namespace net = boost::asio; -namespace ssl = boost::asio::ssl; -using tcp = boost::asio::ip::tcp; -using json = nlohmann::json; - - -class VisSession : public std::enable_shared_from_this<VisSession> -{ - //net::io_context m_ioc; - tcp::resolver m_resolver; - tcp::resolver::results_type m_results; - std::string m_hostname; - websocket::stream<beast::ssl_stream<beast::tcp_stream>> m_ws; - beast::flat_buffer m_buffer; - -public: - // Resolver and socket require an io_context - explicit VisSession(const VisConfig &config, net::io_context& ioc, ssl::context& ctx); - - // Start the asynchronous operation - void run(); - -protected: - VisConfig m_config; - std::atomic_uint m_requestid; - - void on_resolve(beast::error_code error, tcp::resolver::results_type results); - - void connect(); - - void on_connect(beast::error_code error, tcp::resolver::results_type::endpoint_type endpoint); - - void on_ssl_handshake(beast::error_code error); - - void on_handshake(beast::error_code error); - - void on_authorize(beast::error_code error, std::size_t bytes_transferred); - - void on_write(beast::error_code error, std::size_t bytes_transferred); - - void on_read(beast::error_code error, std::size_t bytes_transferred); - - void get(const std::string &path); - - void set(const std::string &path, const std::string &value); - - void subscribe(const std::string &path); - - void handle_message(const json &message); - - bool parseData(const json &message, std::string &path, std::string &value, std::string ×tamp); - - virtual void handle_authorized_response(void) = 0; - - virtual void handle_get_response(std::string &path, std::string &value, std::string ×tamp) = 0; - - virtual void handle_notification(std::string &path, std::string &value, std::string ×tamp) = 0; - -}; - -#endif // _VIS_SESSION_HPP |