aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorScott Murray <scott.murray@konsulko.com>2024-04-01 19:47:05 -0400
committerScott Murray <scott.murray@konsulko.com>2024-04-04 05:14:20 -0400
commite437300d0b5c20114ea1843dbf00815304af3f27 (patch)
tree34e7d1630569f4706a1c4fd389ba03d520641f35
parent52dbc88ee006d4c7e9cab625d4a976919b86e4ac (diff)
Initial check-in of proxy implementation. See README.md for build and configuration information, as well as feature implementation status (TODOs section). Bug-AGL: SPEC-5109 Change-Id: I681c693a73f29e284670be977e1a460883f27769 Signed-off-by: Scott Murray <scott.murray@konsulko.com>
-rw-r--r--.gitreview5
-rw-r--r--LICENSE54
-rw-r--r--README.md67
-rw-r--r--meson.build8
-rw-r--r--meson_options.txt1
-rw-r--r--protos/vss-notification.proto91
-rw-r--r--src/GlobalConfig.h35
-rw-r--r--src/KuksaClient.cpp384
-rw-r--r--src/KuksaClient.h80
-rw-r--r--src/KuksaConfig.cpp43
-rw-r--r--src/KuksaConfig.h46
-rw-r--r--src/MqttClient.cpp171
-rw-r--r--src/MqttClient.h46
-rw-r--r--src/MqttConfig.cpp71
-rw-r--r--src/MqttConfig.h77
-rw-r--r--src/ParseConfig.cpp353
-rw-r--r--src/ParseConfig.h23
-rw-r--r--src/ProxyService.cpp164
-rw-r--r--src/ProxyService.h62
-rw-r--r--src/SignalUpdateHandler.cpp65
-rw-r--r--src/SignalUpdateHandler.h42
-rw-r--r--src/SignalUpdateHandlers.cpp54
-rw-r--r--src/SignalUpdateHandlers.h36
-rw-r--r--src/config.yaml.example90
-rw-r--r--src/config.yaml.minimal2
-rw-r--r--src/config.yaml.test13
-rw-r--r--src/main.cpp87
-rw-r--r--src/meson.build67
-rw-r--r--systemd/agl-vss-proxy.service11
-rw-r--r--systemd/meson.build5
30 files changed, 2253 insertions, 0 deletions
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..1a450e0
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.automotivelinux.org
+port=29418
+project=src/agl-vss-proxy
+defaultbranch=master
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..31c692a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,54 @@
+Apache License
+
+Version 2.0, January 2004
+
+http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
+
+"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
+
+"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
+
+"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
+
+"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
+
+"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
+
+"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
+
+"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
+
+"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
+
+"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
+
+ You must give any other recipients of the Work or Derivative Works a copy of this License; and
+ You must cause any modified files to carry prominent notices stating that You changed the files; and
+ You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
+ If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..555af37
--- /dev/null
+++ b/README.md
@@ -0,0 +1,67 @@
+# Introduction
+
+A simple Vehicle Signal Specification (VSS) signal to MQTT proxy, using the KUKSA.val databroker as VSS source. The MQTT payloads are protobufs loosely derived from the update entries in the KUKSA.val gRPC VAL API. See `protos/vss-notification.proto` for the protobuf definition.
+
+# Support
+
+Support options include:
+- #automotive IRC channel on Libera.chat, see https://libera.chat/guides/ for guidance on using IRC.
+- AGL Vehicle to Cloud (V2C) expert group (EG) Zoom call every second Monday, see calendar at https://lists.automotivelinux.org/g/agl-dev-community/calendar.
+- AGL community mailing list, see https://lists.automotivelinux.org/g/agl-dev-community for subscription information and archives.
+
+# Building
+
+Required dependencies:
+- g++
+- protoc
+- grpc
+- grpc++
+- glib 2.0
+- openssl
+- libmosquitto
+- yaml-cpp
+
+The development packages for the above need to be available in your build environment.
+
+Compilation also requires the KUKSA.val VAL API protobuf definitions. To get them, clone the KUKSA.val repository from https://github.com/eclipse/kuksa.val.git. A build can then be configued by running:
+```
+meson -Dprotos=<kuksa.val/proto> [build directory]
+```
+where `kuksa.val` is the path to your clone of the kuksa.val repository, and `build directory` is an optional build directory that meson will configure (default is `build` in the current directory). The proxy can then be built by running `ninja` in the created build directory.
+
+# Configuration
+
+The proxy requires a configuation file in YAML syntax. If run with no arguments, it will look for it in `/etc/agl-vss-proxy/config.yaml`. Alternatively, a different file can be specified on the command-line with the `--config-file` option. Specifying signals to proxy is the only mandatory requirement in the configuration file. An example of a minimal configuration file is:
+```
+signals:
+ - Vehicle.Speed
+ ```
+ This will result in attempting to use local instances of the KUKSA.val databroker and a MQTT broker with no TLS and default values for the payload client ID, MQTT ID, and MQTT topic. See `src/config.yaml.example` for a full example configuration file that documents the various options.
+
+# Source Provenance
+
+Some source code from previous AGL projects has been adapted for the proxy, it is documented below to clarify copyrights.
+
+From agl-service-hvac (https://git.automotivelinux.org/apps/agl-service-hvac) as of commit 6ebb449:
+- meson.build
+- meson_options.txt
+- src/meson.build
+- src/main.cpp
+- src/HvacService.* -> src/ProxyService.*
+- src/KuksaClient.*
+- src/KuksaConfig.*
+
+From agl-telematics-demo-recorder (https://git.automotivelinux.org/apps/agl-telematics-demo-recorder) as of commit a69d2e0b:
+- app/mqttclient.* -> src/MqttClient.*
+
+# TODOs
+
+Planned features that are currently not implemented:
+- Signal rate-limiting with frequency/period option
+- Per-signal conditions to control when signals are proxied.
+- Capturing/snapshotting signal state when a condition is triggered.
+
+Other potential features:
+- MQTT v5 message properties support
+- Pre-shared key support
+- VIS client support to support other VSS server implementations \ No newline at end of file
diff --git a/meson.build b/meson.build
new file mode 100644
index 0000000..ccc4964
--- /dev/null
+++ b/meson.build
@@ -0,0 +1,8 @@
+project('agl-vss-proxy',
+ 'cpp',
+ license : 'Apache-2.0',
+ default_options : ['c_std=c17', 'cpp_std=c++17'])
+
+subdir('src')
+subdir('systemd')
+
diff --git a/meson_options.txt b/meson_options.txt
new file mode 100644
index 0000000..1fe219e
--- /dev/null
+++ b/meson_options.txt
@@ -0,0 +1 @@
+option('protos', type : 'string', value : '/usr/include', description : 'Include directory for .proto files')
diff --git a/protos/vss-notification.proto b/protos/vss-notification.proto
new file mode 100644
index 0000000..6a40d7a
--- /dev/null
+++ b/protos/vss-notification.proto
@@ -0,0 +1,91 @@
+/********************************************************************************
+ * Copyright (c) 2022 Contributors to the Eclipse Foundation
+ * Copyright (c) 2024 Konsulko Group
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Apache License 2.0 which is available at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ ********************************************************************************/
+
+syntax = "proto3";
+
+package agl;
+
+import "google/protobuf/timestamp.proto";
+
+message SignalUpdateNotification {
+ string clientId = 1;
+ repeated SignalUpdateEntry signals = 2;
+}
+
+/*
+ * Derived from KUKSA.val's Datapoint
+ *
+ * Type conversions from VSS types follow the rules outlined in:
+ * https://github.com/eclipse/kuksa.val/blob/master/kuksa_databroker/doc/TYPES.md
+ */
+
+message SignalUpdateEntry {
+ oneof signal {
+ string path = 1;
+ string uuid = 2;
+ }
+
+ google.protobuf.Timestamp timestamp = 3;
+
+ oneof value {
+ string string = 11;
+ bool bool = 12;
+ sint32 int32 = 13;
+ sint64 int64 = 14;
+ uint32 uint32 = 15;
+ uint64 uint64 = 16;
+ float float = 17;
+ double double = 18;
+ StringArray string_array = 21;
+ BoolArray bool_array = 22;
+ Int32Array int32_array = 23;
+ Int64Array int64_array = 24;
+ Uint32Array uint32_array = 25;
+ Uint64Array uint64_array = 26;
+ FloatArray float_array = 27;
+ DoubleArray double_array = 28;
+ }
+}
+
+message StringArray {
+ repeated string values = 1;
+}
+
+message BoolArray {
+ repeated bool values = 1;
+}
+
+message Int32Array {
+ repeated sint32 values = 1;
+}
+
+message Int64Array {
+ repeated sint64 values = 1;
+}
+
+message Uint32Array {
+ repeated uint32 values = 1;
+}
+
+message Uint64Array {
+ repeated uint64 values = 1;
+}
+
+message FloatArray {
+ repeated float values = 1;
+}
+
+message DoubleArray {
+ repeated double values = 1;
+}
diff --git a/src/GlobalConfig.h b/src/GlobalConfig.h
new file mode 100644
index 0000000..9f1b76e
--- /dev/null
+++ b/src/GlobalConfig.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _GLOBAL_CONFIG_H
+#define _GLOBAL_CONFIG_H
+
+#include <string>
+
+#define DEFAULT_CLIENT_ID "AGL-V2C-932581de-7091-43bb-b8b2-e33437a5945f"
+
+class GlobalConfig
+{
+public:
+ GlobalConfig() {};
+ explicit GlobalConfig(const std::string &clientId,
+ const unsigned verbose) :
+ m_clientId(clientId),
+ m_verbose(verbose) {};
+ ~GlobalConfig(){};
+
+ std::string clientId() { return m_clientId; };
+ unsigned verbose() { return m_verbose; };
+ void setVerbose(const unsigned verbose) { m_verbose = verbose; };
+
+private:
+ std::string m_clientId;
+ unsigned m_verbose = 0;
+};
+
+extern GlobalConfig g_config;
+
+#endif // _GLOBAL_CONFIG_H
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp
new file mode 100644
index 0000000..b741df4
--- /dev/null
+++ b/src/KuksaClient.cpp
@@ -0,0 +1,384 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <regex>
+#include <iterator>
+#include <mutex>
+
+#include "KuksaClient.h"
+#include "GlobalConfig.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::ClientReader;
+using grpc::Status;
+
+KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) :
+ m_config(config)
+{
+ m_stub = VAL::NewStub(channel);
+}
+
+void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ GetRequest request;
+ auto entry = request.add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ GetResponse *response = new GetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create GetResponse");
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Get(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleGetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+// Since a set request needs a Datapoint with the appropriate type value,
+// checking the signal metadata to get the type would be a requirement for
+// a generic set call that takes a string as argument. For now, assume
+// that set with a string is specifically for a signal of string type.
+
+void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_string(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const bool value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_bool_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_float_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_double_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ auto entry = request->add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ for(auto it = signals.cbegin(); it != signals.cend(); ++it) {
+ auto entry = request->add_entries();
+ entry->set_path(it->first);
+ entry->add_fields(Field::FIELD_PATH);
+ if (it->second)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+ }
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ if (!(request && cb))
+ return;
+
+ class Reader : public grpc::ClientReadReactor<SubscribeResponse> {
+ public:
+ Reader(VAL::Stub *stub,
+ KuksaClient *client,
+ KuksaConfig &config,
+ GlobalConfig &g_config,
+ const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb):
+ client_(client),
+ config_(config),
+ g_config_(g_config),
+ request_(request),
+ cb_(cb),
+ done_cb_(done_cb) {
+ std::string token = config_.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context_.AddMetadata(std::string("authorization"), token);
+ }
+ stub->async()->Subscribe(&context_, request, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (ok) {
+ if (client_)
+ client_->handleSubscribeResponse(&response_, cb_);
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ status_ = s;
+ if (client_) {
+ if (g_config_.verbose() > 1)
+ std::cerr << "KuksaClient::subscribe::Reader done" << std::endl;
+ client_->handleSubscribeDone(request_, status_, done_cb_);
+ }
+
+ // gRPC engine is done with us, safe to self-delete
+ delete request_;
+ delete this;
+ }
+
+ private:
+ KuksaClient *client_;
+ KuksaConfig config_;
+ GlobalConfig g_config_;
+ const SubscribeRequest *request_;
+ SubscribeResponseCallback cb_;
+ SubscribeDoneCallback done_cb_;
+
+ ClientContext context_;
+ SubscribeResponse response_;
+ std::mutex mutex_;
+ Status status_;
+ };
+ Reader *reader = new Reader(m_stub.get(), this, m_config, g_config, request, cb, done_cb);
+ if (!reader)
+ handleCriticalFailure("Could not create Subscribe reader");
+}
+
+// Private
+
+void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ SetRequest request;
+ auto update = request.add_updates();
+ auto entry = update->mutable_entry();
+ entry->set_path(path);
+ if (actuator) {
+ auto target = entry->mutable_actuator_target();
+ *target = dp;
+ update->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ } else {
+ auto value = entry->mutable_value();
+ *value = dp;
+ update->add_fields(Field::FIELD_VALUE);
+ }
+
+ SetResponse *response = new SetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create SetResponse");
+ delete context;
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Set(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleSetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb)
+{
+ if (!(response && response->entries_size() && cb))
+ return;
+
+ for (auto it = response->entries().begin(); it != response->entries().end(); ++it) {
+ // We expect paths in the response entries
+ if (!it->path().size())
+ continue;
+
+ Datapoint dp;
+ if (it->has_actuator_target())
+ dp = it->actuator_target();
+ else
+ dp = it->value();
+ cb(it->path(), dp);
+ }
+}
+
+void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb)
+{
+ if (!(response && response->errors_size() && cb))
+ return;
+
+ for (auto it = response->errors().begin(); it != response->errors().end(); ++it) {
+ cb(it->path(), it->error());
+ }
+
+}
+
+void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb)
+{
+ if (!(response && response->updates_size() && cb))
+ return;
+
+ for (auto it = response->updates().begin(); it != response->updates().end(); ++it) {
+ // We expect entries that have paths in the response
+ if (!(it->has_entry() && it->entry().path().size()))
+ continue;
+
+ auto entry = it->entry();
+ if (g_config.verbose() > 1)
+ std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl;
+
+ Datapoint dp;
+ if (entry.has_actuator_target())
+ dp = entry.actuator_target();
+ else
+ dp = entry.value();
+
+ cb(entry.path(), dp);
+ }
+}
+
+void KuksaClient::handleSubscribeDone(const SubscribeRequest *request,
+ const Status &status,
+ SubscribeDoneCallback cb)
+{
+ if (cb)
+ cb(request, status);
+}
+
+void KuksaClient::handleCriticalFailure(const std::string &error)
+{
+ if (error.size())
+ std::cerr << error << std::endl;
+ exit(1);
+}
+
diff --git a/src/KuksaClient.h b/src/KuksaClient.h
new file mode 100644
index 0000000..a5f901e
--- /dev/null
+++ b/src/KuksaClient.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef KUKSA_CLIENT_H
+#define KUKSA_CLIENT_H
+
+#include <string>
+#include <map>
+#include <grpcpp/grpcpp.h>
+#include "kuksa/val/v1/val.grpc.pb.h"
+
+// Just pull in the whole namespace since Datapoint contains a lot of
+// definitions that may potentially be needed.
+using namespace kuksa::val::v1;
+
+using grpc::Status;
+
+#include "KuksaConfig.h"
+
+// API response callback types
+typedef std::function<void(const std::string &path, const Datapoint &dp)> GetResponseCallback;
+typedef std::function<void(const std::string &path, const Error &error)> SetResponseCallback;
+typedef std::function<void(const std::string &path, const Datapoint &dp)> SubscribeResponseCallback;
+typedef std::function<void(const SubscribeRequest *request, const Status &status)> SubscribeDoneCallback;
+
+// KUKSA.val databroker "VAL" gRPC API client class
+
+class KuksaClient
+{
+public:
+ explicit KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config);
+
+ void get(const std::string &path, GetResponseCallback cb, const bool actuator = false);
+
+ void set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const bool value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator = false);
+ void set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator = false);
+
+ void subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator = false,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+ void subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb = nullptr);
+
+private:
+ KuksaConfig m_config;
+ std::shared_ptr<VAL::Stub> m_stub;
+
+ void set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator);
+
+ void handleGetResponse(const GetResponse *response, GetResponseCallback cb);
+
+ void handleSetResponse(const SetResponse *response, SetResponseCallback cb);
+
+ void handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb);
+
+ void handleSubscribeDone(const SubscribeRequest *request, const Status &status, SubscribeDoneCallback cb);
+
+ void handleCriticalFailure(const std::string &error);
+ void handleCriticalFailure(const char *error) { handleCriticalFailure(std::string(error)); };
+};
+
+#endif // KUKSA_CLIENT_H
diff --git a/src/KuksaConfig.cpp b/src/KuksaConfig.cpp
new file mode 100644
index 0000000..c409902
--- /dev/null
+++ b/src/KuksaConfig.cpp
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2022-2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+#include <exception>
+#include "KuksaConfig.h"
+
+KuksaConfig::KuksaConfig() :
+ m_hostname("localhost"),
+ m_port(55555),
+ m_useTls(false),
+ m_caCert(""),
+ m_tlsServerName(""),
+ m_authToken("")
+
+{
+ m_valid = true;
+}
+
+KuksaConfig::KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const bool useTls,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken) :
+ m_hostname(hostname),
+ m_port(port),
+ m_useTls(useTls),
+ m_caCert(caCert),
+ m_tlsServerName(tlsServerName),
+ m_authToken(authToken)
+{
+ if (m_hostname.size() && m_port)
+ m_valid = true;
+ if (m_useTls && m_caCert.empty())
+ m_valid = false;
+
+}
diff --git a/src/KuksaConfig.h b/src/KuksaConfig.h
new file mode 100644
index 0000000..874888c
--- /dev/null
+++ b/src/KuksaConfig.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2022-2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _KUKSA_CONFIG_H
+#define _KUKSA_CONFIG_H
+
+#include <string>
+
+#define DEFAULT_KUKSA_PORT 55555
+#define DEFAULT_KUKSA_CA_CERT_FILE "/etc/kuksa-val/CA.pem"
+
+class KuksaConfig
+{
+public:
+ KuksaConfig();
+
+ explicit KuksaConfig(const std::string &hostname,
+ const unsigned port,
+ const bool useTls,
+ const std::string &caCert,
+ const std::string &tlsServerName,
+ const std::string &authToken);
+ ~KuksaConfig(){};
+
+ std::string hostname() { return m_hostname; };
+ unsigned port() { return m_port; };
+ bool useTls() { return m_useTls; };
+ std::string caCert() { return m_caCert; };
+ std::string tlsServerName() { return m_tlsServerName; };
+ std::string authToken() { return m_authToken; };
+ bool valid() { return m_valid; };
+
+private:
+ std::string m_hostname;
+ unsigned m_port;
+ bool m_useTls;
+ std::string m_caCert;
+ std::string m_tlsServerName;
+ std::string m_authToken;
+ bool m_valid = false;
+};
+
+#endif // _KUKSA_CONFIG_H
diff --git a/src/MqttClient.cpp b/src/MqttClient.cpp
new file mode 100644
index 0000000..38c8ec6
--- /dev/null
+++ b/src/MqttClient.cpp
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2019,2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <iostream>
+#include <cstring>
+
+#include "MqttClient.h"
+#include "GlobalConfig.h"
+
+MqttClient::MqttClient(const MqttConfig &config) :
+ m_config(config)
+{
+ mosquitto_lib_init();
+ m_mosq = mosquitto_new(m_config.clientId().c_str(), m_config.cleanOnDisconnect(), this);
+
+ mosquitto_connect_callback_set(m_mosq, onConnect);
+ mosquitto_disconnect_callback_set(m_mosq, onDisconnect);
+
+ if(m_config.username().size()) {
+ mosquitto_username_pw_set(m_mosq,
+ m_config.username().c_str(),
+ m_config.password().c_str());
+ }
+
+ m_loop = mosquitto_loop_start(m_mosq);
+ if(m_loop != MOSQ_ERR_SUCCESS){
+ std::cerr << __FUNCTION__ << ": Unable to start loop, error = " << m_loop << std::endl;
+ }
+}
+
+bool MqttClient::start()
+{
+ if (!(m_mosq && m_loop == MOSQ_ERR_SUCCESS))
+ return false;
+
+ if (!(m_config.username().empty() || m_config.password().empty())) {
+ std::cout << "Using MQTT username & password" << std::endl;
+ if (mosquitto_username_pw_set(m_mosq,
+ m_config.username().c_str(),
+ m_config.password().c_str()) != MOSQ_ERR_SUCCESS) {
+ return false;
+ }
+ }
+
+ if (m_config.useTls() && !m_config.caCertFile().empty()) {
+ std::cout << "Using MQTT TLS server certificate" << std::endl;
+ char *clientCertFile = NULL;
+ char *clientKeyFile = NULL;
+ if (!(m_config.clientCertFile().empty() || m_config.clientKeyFile().empty())) {
+ clientCertFile = strdup(m_config.clientCertFile().c_str());
+ clientKeyFile = strdup(m_config.clientKeyFile().c_str());
+ std::cout << "Using MQTT TLS client certificate" << std::endl;
+ }
+ if (mosquitto_tls_set(m_mosq,
+ m_config.caCertFile().c_str(),
+ NULL,
+ clientCertFile,
+ clientKeyFile,
+ NULL) != MOSQ_ERR_SUCCESS) {
+ std::cerr << "Error configuring MQTT TLS support" << std::endl;
+ free(clientCertFile);
+ free(clientKeyFile);
+ return false;
+ }
+
+ if (!m_config.verifyServerHostname()) {
+ if (mosquitto_tls_insecure_set(m_mosq, true) != MOSQ_ERR_SUCCESS) {
+ return false;
+ }
+ }
+ }
+
+ bool rc = true;
+ std::cout << "Using MQTT server " << m_config.hostname() << ":" << m_config.port() << std::endl;
+ if(mosquitto_connect_async(m_mosq,
+ m_config.hostname().c_str(),
+ m_config.port(),
+ m_config.keepalive())) {
+ std::cerr << "Unable to start MQTT connection to " << m_config.hostname() << std::endl;
+ rc = false;
+ }
+
+ return rc;
+}
+
+MqttClient::~MqttClient(void)
+{
+ mosquitto_disconnect(m_mosq);
+ mosquitto_loop_stop(m_mosq, true);
+ mosquitto_destroy(m_mosq);
+ mosquitto_lib_cleanup();
+}
+
+int MqttClient::publish(const std::string &topic, const char *data, const int len, const unsigned qos, const bool retain)
+{
+ if (len <= 0 || qos > 2)
+ return -1;
+
+ return mosquitto_publish(m_mosq,
+ NULL,
+ topic.c_str(),
+ len,
+ data,
+ qos,
+ retain);
+}
+
+int MqttClient::publish(const std::string &topic, const char *data, const int len)
+{
+ if (len <= 0)
+ return -1;
+
+ return mosquitto_publish(m_mosq,
+ NULL,
+ topic.c_str(),
+ len,
+ data,
+ m_config.qos(),
+ m_config.retain());
+}
+
+int MqttClient::publish(const char *data, const int len)
+{
+ if (len <= 0)
+ return -1;
+
+ return mosquitto_publish(m_mosq,
+ NULL,
+ m_config.topic().c_str(),
+ len,
+ data,
+ m_config.qos(),
+ m_config.retain());
+}
+
+// Private
+
+void MqttClient::onConnect(struct mosquitto *mosq, void *obj, int rc)
+{
+ if (!obj)
+ return;
+
+ MqttClient *p = (MqttClient*) obj;
+ p->handleConnect(rc);
+}
+
+void MqttClient::onDisconnect(struct mosquitto *mosq, void *obj, int rc)
+{
+ if (!obj)
+ return;
+
+ MqttClient *p = (MqttClient*) obj;
+ p->handleDisconnect(rc);
+}
+
+void MqttClient::handleConnect(int rc)
+{
+ m_connected = true;
+ if (g_config.verbose())
+ std::cerr << "MQTT Connected, rc = " << rc << std::endl;
+}
+
+void MqttClient::handleDisconnect(int rc)
+{
+ m_connected = false;
+ if (g_config.verbose())
+ std::cerr << "MQTT Disconnected, rc = " << rc << std::endl;
+}
diff --git a/src/MqttClient.h b/src/MqttClient.h
new file mode 100644
index 0000000..174c3e4
--- /dev/null
+++ b/src/MqttClient.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2019,2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef MQTTCLIENT_H
+#define MQTTCLIENT_H
+
+#include <string>
+#include <atomic>
+#include <mosquitto.h>
+
+#include "MqttConfig.h"
+
+class MqttClient
+{
+public:
+ MqttClient(const MqttConfig &config);
+ ~MqttClient();
+
+ bool start();
+
+ bool connected() {
+ bool connected;
+ connected = m_connected.load();
+ return connected;
+ }
+
+ int publish(const std::string &topic, const char *data, const int len, const unsigned qos, const bool retain);
+ int publish(const std::string &topic, const char *data, const int len);
+ int publish(const char *data, const int len);
+
+private:
+ static void onConnect(struct mosquitto *mosq, void *obj, int rc);
+ static void onDisconnect(struct mosquitto *mosq, void *obj, int rc);
+ void handleConnect(int rc);
+ void handleDisconnect(int rc);
+
+ MqttConfig m_config;
+ mosquitto *m_mosq = NULL;
+ int m_loop = -1;
+ std::atomic<bool> m_connected{false};
+};
+
+#endif // MQTTCLIENT_H
diff --git a/src/MqttConfig.cpp b/src/MqttConfig.cpp
new file mode 100644
index 0000000..8ac8c5d
--- /dev/null
+++ b/src/MqttConfig.cpp
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2022-2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+#include <exception>
+
+#include "MqttConfig.h"
+
+MqttConfig::MqttConfig() :
+ m_hostname("localhost"),
+ m_port(DEFAULT_MQTT_PORT),
+ m_keepalive(DEFAULT_MQTT_KEEPALIVE),
+ m_username(""),
+ m_password(""),
+ m_clientId(DEFAULT_MQTT_CLIENT_ID),
+ m_cleanOnDisconnect(false),
+ m_topic(DEFAULT_MQTT_TOPIC),
+ m_qos(0),
+ m_retain(true),
+ m_useTls(false),
+ m_caCertFile(""),
+ m_verifyServerHostname(""),
+ m_clientCertFile(""),
+ m_clientKeyFile("")
+{
+ m_valid = true;
+}
+
+MqttConfig::MqttConfig(const std::string &hostname,
+ const unsigned port,
+ const unsigned keepalive,
+ const std::string &username,
+ const std::string &password,
+ const std::string &clientId,
+ const bool cleanOnDisconnect,
+ const std::string &topic,
+ const unsigned qos,
+ const bool retain,
+ const bool useTls,
+ const std::string &caCertFile,
+ const bool &verifyServerHostname,
+ const std::string &clientCertFile,
+ const std::string &clientKeyFile) :
+ m_hostname(hostname),
+ m_port(port),
+ m_keepalive(keepalive),
+ m_username(username),
+ m_password(password),
+ m_clientId(clientId),
+ m_cleanOnDisconnect(cleanOnDisconnect),
+ m_topic(topic),
+ m_qos(qos),
+ m_retain(retain),
+ m_useTls(useTls),
+ m_caCertFile(caCertFile),
+ m_verifyServerHostname(verifyServerHostname),
+ m_clientCertFile(clientCertFile),
+ m_clientKeyFile(clientKeyFile)
+{
+ if (m_hostname.size() && m_port)
+ m_valid = true;
+ if ((m_useTls && m_caCertFile.empty()) ||
+ m_topic.empty() ||
+ m_qos > 2)
+ m_valid = false;
+}
diff --git a/src/MqttConfig.h b/src/MqttConfig.h
new file mode 100644
index 0000000..6ed6cdf
--- /dev/null
+++ b/src/MqttConfig.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _MQTT_CONFIG_H
+#define _MQTT_CONFIG_H
+
+#include <string>
+
+#define DEFAULT_MQTT_PORT 1883
+#define DEFAULT_MQTT_TLS_PORT 8883
+#define DEFAULT_MQTT_KEEPALIVE 60
+#define DEFAULT_MQTT_CLIENT_ID "AGL-DEMO-Kdxm0Bzhaqe7c"
+#define DEFAULT_MQTT_TOPIC "AGL-V2C-TOPIC-ab9d6672-fe61-45b1-b027-867583fc8307"
+
+class MqttConfig
+{
+public:
+ MqttConfig();
+
+ explicit MqttConfig(const std::string &hostname,
+ const unsigned port,
+ const unsigned keepalive,
+ const std::string &username,
+ const std::string &password,
+ const std::string &clientId,
+ const bool cleanOnDisconnect,
+ const std::string &topic,
+ const unsigned qos,
+ const bool retain,
+ const bool useTls,
+ const std::string &caCertFile,
+ const bool &verifyServerHostname,
+ const std::string &clientCertFile,
+ const std::string &clientKeyFile);
+
+ ~MqttConfig(){};
+
+ std::string hostname() { return m_hostname; };
+ unsigned port() { return m_port; };
+ unsigned keepalive() { return m_keepalive; };
+ std::string username() { return m_username; };
+ std::string password() { return m_password; };
+ std::string clientId() { return m_clientId; };
+ bool cleanOnDisconnect() { return m_cleanOnDisconnect; };
+ std::string topic() { return m_topic; };
+ unsigned qos() { return m_qos; };
+ bool retain() { return m_retain; };
+ bool useTls() { return m_useTls; };
+ std::string caCertFile() { return m_caCertFile; };
+ bool verifyServerHostname() { return m_verifyServerHostname; };
+ std::string clientCertFile() { return m_clientCertFile; };
+ std::string clientKeyFile() { return m_clientKeyFile; };
+ bool valid() { return m_valid; };
+
+private:
+ std::string m_hostname;
+ unsigned m_port;
+ unsigned m_keepalive;
+ std::string m_username;
+ std::string m_password;
+ std::string m_clientId;
+ bool m_cleanOnDisconnect;
+ std::string m_topic;
+ unsigned m_qos;
+ bool m_retain;
+ bool m_useTls;
+ std::string m_caCertFile;
+ bool m_verifyServerHostname;
+ std::string m_clientCertFile;
+ std::string m_clientKeyFile;
+ bool m_valid = false;
+};
+
+#endif // _MQTT_CONFIG_H
diff --git a/src/ParseConfig.cpp b/src/ParseConfig.cpp
new file mode 100644
index 0000000..e0e1a7d
--- /dev/null
+++ b/src/ParseConfig.cpp
@@ -0,0 +1,353 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <yaml-cpp/yaml.h>
+
+#include "ParseConfig.h"
+
+void readFile(const std::string &filename, std::string &data)
+{
+ data.clear();
+ try {
+ std::ifstream file(filename);
+ std::stringstream buffer;
+ buffer << file.rdbuf();
+ data = buffer.str();
+ } catch (const std::exception &e) {
+ data.clear();
+ }
+}
+
+bool ParseGlobalConfig(const YAML::Node &globalConfigYaml, GlobalConfig &globalConfig)
+{
+ std::string clientId = DEFAULT_CLIENT_ID;
+ if (globalConfigYaml["client-id"]) {
+ clientId = globalConfigYaml["client-id"].as<std::string>();
+ if (clientId.empty()) {
+ std::cerr << "Invalid client ID" << std::endl;
+ return false;
+ }
+ }
+
+ unsigned verbose = 0;
+ std::string verboseStr;
+ if (globalConfigYaml["verbose"])
+ verboseStr = globalConfigYaml["verbose"].as<std::string>();
+ if (!verboseStr.empty()) {
+ if (verboseStr == "true" || verboseStr == "1")
+ verbose = 1;
+ if (verboseStr == "2")
+ verbose = 2;
+ }
+
+ globalConfig = GlobalConfig(clientId, verbose);
+
+ return true;
+}
+
+bool ParseKuksaConfig(const YAML::Node &kuksaConfigYaml, KuksaConfig &kuksaConfig)
+{
+ std::string hostname = "localhost";
+ if (kuksaConfigYaml["hostname"]) {
+ hostname = kuksaConfigYaml["hostname"].as<std::string>();
+ if (hostname.empty()) {
+ std::cerr << "Invalid server hostname" << std::endl;
+ return false;
+ }
+ }
+
+ unsigned port = DEFAULT_KUKSA_PORT;
+ if (kuksaConfigYaml["port"]) {
+ port = kuksaConfigYaml["port"].as<unsigned>();
+ if (port == 0) {
+ std::cerr << "Invalid server port" << std::endl;
+ return false;
+ }
+ }
+
+ bool useTls = false;
+ if (kuksaConfigYaml["use-tls"]) {
+ useTls = kuksaConfigYaml["use-tls"].as<bool>();
+ }
+
+ std::string caCert;
+ std::string tlsServerName;
+ if (useTls) {
+ std::string caCertFile = DEFAULT_KUKSA_CA_CERT_FILE;
+ if (kuksaConfigYaml["ca-certificate"]) {
+ caCertFile = kuksaConfigYaml["ca-certificate"].as<std::string>();
+ if (caCertFile.empty()) {
+ std::cerr << "Invalid CA certificate filename" << std::endl;
+ return false;
+ }
+ }
+
+ if (!caCertFile.empty()) {
+ readFile(caCertFile, caCert);
+ if (caCert.empty()) {
+ std::cerr << "Invalid CA certificate file" << std::endl;
+ return false;
+ }
+ }
+
+ if (kuksaConfigYaml["tls-server-name"]) {
+ tlsServerName = kuksaConfigYaml["tls-server-name"].as<std::string>();
+ if (tlsServerName.empty()) {
+ std::cerr << "Invalid TLS server name" << std::endl;
+ return false;
+ }
+ }
+ }
+
+ std::string authTokenFilename;
+ if (kuksaConfigYaml["authorization"]) {
+ authTokenFilename = kuksaConfigYaml["authorization"].as<std::string>();
+ if (authTokenFilename.empty()) {
+ std::cerr << "Invalid authorization token filename" << std::endl;
+ return false;
+ }
+ }
+ std::string authToken;
+ if (!authTokenFilename.empty()) {
+ readFile(authTokenFilename, authToken);
+ if (authToken.empty()) {
+ std::cerr << "Invalid authorization token file" << std::endl;
+ return false;
+ }
+ }
+
+ kuksaConfig = KuksaConfig(hostname,
+ port,
+ useTls,
+ caCert,
+ tlsServerName,
+ authToken);
+
+ return true;
+}
+
+bool ParseMqttConfig(const YAML::Node &mqttConfigYaml, MqttConfig &mqttConfig)
+{
+ std::string hostname = "localhost";
+ if (mqttConfigYaml["hostname"]) {
+ hostname = mqttConfigYaml["hostname"].as<std::string>();
+ if (hostname.empty()) {
+ std::cerr << "Invalid MQTT server hostname" << std::endl;
+ return false;
+ }
+ }
+
+ unsigned port = DEFAULT_MQTT_PORT;
+ bool defaultPort = true;
+ if (mqttConfigYaml["port"]) {
+ port = mqttConfigYaml["port"].as<unsigned>();
+ if (port == 0) {
+ std::cerr << "Invalid MQTT server port" << std::endl;
+ return false;
+ }
+ defaultPort = false;
+ }
+
+ unsigned keepalive = DEFAULT_MQTT_KEEPALIVE;
+ if (mqttConfigYaml["keepalive"]) {
+ port = mqttConfigYaml["keepalive"].as<unsigned>();
+ }
+
+ std::string username;
+ if (mqttConfigYaml["username"]) {
+ username = mqttConfigYaml["username"].as<std::string>();
+ if (username.empty()) {
+ std::cerr << "Invalid MQTT username" << std::endl;
+ return false;
+ }
+ }
+
+ std::string password;
+ if (mqttConfigYaml["password"]) {
+ password = mqttConfigYaml["password"].as<std::string>();
+ if (password.empty()) {
+ std::cerr << "Invalid MQTT password" << std::endl;
+ return false;
+ }
+ }
+
+ std::string clientId = DEFAULT_MQTT_CLIENT_ID;
+ if (mqttConfigYaml["client-id"]) {
+ clientId = mqttConfigYaml["client-id"].as<std::string>();
+ if (clientId.empty()) {
+ std::cerr << "Invalid MQTT client ID" << std::endl;
+ return false;
+ }
+ }
+
+ bool cleanOnDisconnect = false;
+ if (mqttConfigYaml["clean-on-disconnect"]) {
+ cleanOnDisconnect = mqttConfigYaml["clean-on-disconnect"].as<bool>();
+ }
+
+ std::string topic = DEFAULT_MQTT_TOPIC;
+ if (mqttConfigYaml["topic"]) {
+ topic = mqttConfigYaml["topic"].as<std::string>();
+ if (topic.empty()) {
+ std::cerr << "Invalid MQTT topic" << std::endl;
+ return false;
+ }
+ }
+
+ unsigned qos = 0;
+ if (mqttConfigYaml["qos"]) {
+ qos = mqttConfigYaml["qos"].as<unsigned>();
+ if (qos > 2) {
+ std::cerr << "Invalid MQTT QoS" << std::endl;
+ return false;
+ }
+ }
+
+ bool retain = true;
+ if (mqttConfigYaml["retain"]) {
+ retain = mqttConfigYaml["retain"].as<bool>();
+ }
+
+ bool useTls = false;
+ if (mqttConfigYaml["use-tls"]) {
+ useTls = mqttConfigYaml["use-tls"].as<bool>();
+ }
+
+ std::string caCertFile;
+ bool verifyServerHostname = true;
+ std::string clientCertFile;
+ std::string clientKeyFile;
+ if (useTls) {
+ if (defaultPort)
+ port = DEFAULT_MQTT_TLS_PORT;
+
+ if (mqttConfigYaml["ca-certificate"]) {
+ caCertFile = mqttConfigYaml["ca-certificate"].as<std::string>();
+ if (caCertFile.empty()) {
+ std::cerr << "Invalid MQTT CA certificate filename" << std::endl;
+ return false;
+ }
+ }
+ // Mosquitto takes the cert filename as opposed to the
+ // certificate itself, so no need to read it in here.
+
+ if (mqttConfigYaml["verify-server-hostname"]) {
+ verifyServerHostname = mqttConfigYaml["verify-server-hostname"].as<bool>();
+ }
+
+ if (mqttConfigYaml["client-certificate"]) {
+ clientCertFile = mqttConfigYaml["client-certificate"].as<std::string>();
+ if (clientCertFile.empty()) {
+ std::cerr << "Invalid MQTT client certificate filename" << std::endl;
+ return false;
+ }
+ }
+
+ if (mqttConfigYaml["client-key"]) {
+ clientKeyFile = mqttConfigYaml["client-key"].as<std::string>();
+ if (clientKeyFile.empty()) {
+ std::cerr << "Invalid MQTT client key filename" << std::endl;
+ return false;
+ }
+ }
+ }
+
+ mqttConfig = MqttConfig(hostname,
+ port,
+ keepalive,
+ username,
+ password,
+ clientId,
+ cleanOnDisconnect,
+ topic,
+ qos,
+ retain,
+ useTls,
+ caCertFile,
+ verifyServerHostname,
+ clientCertFile,
+ clientKeyFile);
+
+ return true;
+}
+
+bool ParseSignalsConfig(const YAML::Node &signalsConfigYaml, SignalUpdateHandlers &signalHandlers)
+{
+ if (!signalsConfigYaml.IsSequence())
+ return false;
+
+ bool rc = true;
+ for (std::size_t i = 0; i < signalsConfigYaml.size(); i++) {
+ YAML::Node signalYaml = signalsConfigYaml[i];
+ if (!signalYaml.IsMap()) {
+ rc = false;
+ break;
+ }
+ if (signalYaml["signal"]) {
+ std::string path = signalYaml["signal"].as<std::string>();
+ if (!path.empty()) {
+ std::shared_ptr<SignalUpdateHandler> handler =
+ std::make_shared<PassthroughSignalUpdateHandler>(path);
+ signalHandlers.addSignalUpdateHandler(path, handler);
+ } else {
+ rc = false;
+ break;
+ }
+ }
+ }
+
+ return rc;
+}
+
+bool ParseConfig(const std::string &configFile,
+ GlobalConfig &globalConfig,
+ KuksaConfig &kuksaConfig,
+ MqttConfig &mqttConfig,
+ SignalUpdateHandlers &signalHandlers)
+{
+ bool rc = true;
+
+ YAML::Node tmpYaml;
+ try {
+ tmpYaml = YAML::LoadFile(configFile);
+ } catch (const std::exception &e) {
+ std::cerr << "Invalid configuration " << configFile << std::endl;
+ return false;
+ }
+ std::cout << "Reading configuration " << configFile << std::endl;
+ const YAML::Node &configYaml = tmpYaml;
+
+ rc = ParseGlobalConfig(configYaml, globalConfig);
+ if (!rc)
+ goto error;
+
+ if (configYaml["kuksa"]) {
+ rc = ParseKuksaConfig(configYaml["kuksa"], kuksaConfig);
+ if (!rc)
+ goto error;
+ } else {
+ kuksaConfig = KuksaConfig();
+ }
+ if (configYaml["mqtt"]) {
+ rc = ParseMqttConfig(configYaml["mqtt"], mqttConfig);
+ if (!rc)
+ goto error;
+ } else {
+ mqttConfig = MqttConfig();
+ }
+ if (configYaml["signals"]) {
+ rc = ParseSignalsConfig(configYaml["signals"], signalHandlers);
+ } else {
+ std::cerr << "Missing signals configuration " << std::endl;
+ rc = false;
+ }
+error:
+ return rc;
+}
+
diff --git a/src/ParseConfig.h b/src/ParseConfig.h
new file mode 100644
index 0000000..9e5bda8
--- /dev/null
+++ b/src/ParseConfig.h
@@ -0,0 +1,23 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _PARSE_CONFIG_H
+#define _PARSE_CONFIG_H
+
+#include <string>
+
+#include "GlobalConfig.h"
+#include "KuksaConfig.h"
+#include "MqttConfig.h"
+#include "SignalUpdateHandlers.h"
+
+bool ParseConfig(const std::string &configFile,
+ GlobalConfig &globalConfig,
+ KuksaConfig &kuksaConfig,
+ MqttConfig &mqttConfig,
+ SignalUpdateHandlers &signalHandlers);
+
+#endif // _PARSE_CONFIG_H
diff --git a/src/ProxyService.cpp b/src/ProxyService.cpp
new file mode 100644
index 0000000..110d4de
--- /dev/null
+++ b/src/ProxyService.cpp
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2022-2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <sstream>
+#include <iostream>
+#include <algorithm>
+
+#include "ProxyService.h"
+#include "GlobalConfig.h"
+
+ProxyService::ProxyService(const KuksaConfig &kuksaConfig,
+ const MqttConfig &mqttConfig,
+ const SignalUpdateHandlers &signalUpdateHandlers,
+ GMainLoop *loop) :
+ m_loop(loop),
+ m_kuksaConfig(kuksaConfig),
+ m_mqttConfig(mqttConfig),
+ m_signalUpdateHandlers(signalUpdateHandlers)
+{
+ // Create MQTT client
+ m_mqttClient = new MqttClient(m_mqttConfig);
+ if (m_mqttClient)
+ m_signalUpdateHandlers.setClient(m_mqttClient);
+}
+
+ProxyService::~ProxyService()
+{
+ delete m_kuksaClient;
+ delete m_mqttClient;
+}
+
+bool ProxyService::start() {
+ if (!m_mqttClient)
+ return false;
+
+ if (!m_mqttClient->start())
+ return false;
+
+ // Create gRPC channel
+ std::string host = m_kuksaConfig.hostname();
+ host += ":";
+ std::stringstream ss;
+ ss << m_kuksaConfig.port();
+ host += ss.str();
+
+ std::cout << "Using KUKSA.val databroker " << host << std::endl;
+ std::shared_ptr<grpc::Channel> channel;
+ if (m_kuksaConfig.useTls() && !m_kuksaConfig.caCert().empty()) {
+ std::cout << "Using TLS" << std::endl;
+ grpc::SslCredentialsOptions options;
+ options.pem_root_certs = m_kuksaConfig.caCert();
+ if (!m_kuksaConfig.tlsServerName().empty()) {
+ grpc::ChannelArguments args;
+ auto target = m_kuksaConfig.tlsServerName();
+ std::cout << "Overriding TLS target name with " << target << std::endl;
+ args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target);
+ channel = grpc::CreateCustomChannel(host, grpc::SslCredentials(options), args);
+ } else {
+ channel = grpc::CreateChannel(host, grpc::SslCredentials(options));
+ }
+ } else {
+ channel = grpc::CreateChannel(host, grpc::InsecureChannelCredentials());
+ }
+
+ // Wait for the channel to be ready
+ std::cout << "Waiting for databroker gRPC channel" << std::endl;
+ while (!channel->WaitForConnected(std::chrono::system_clock::now() +
+ std::chrono::milliseconds(500))) ;
+ std::cout << "Databroker gRPC channel ready" << std::endl;
+
+ // Create Kuksa client
+ bool rc = false;
+ m_kuksaClient = new KuksaClient(channel, m_kuksaConfig);
+ if (m_kuksaClient) {
+ // Listen to signal updates
+ std::map<std::string, bool> signals;
+ auto handledSignals = m_signalUpdateHandlers.getSignals();
+ for (auto it = handledSignals.begin(); it != handledSignals.end(); ++it) {
+ std::cout << "Subscribing to " << *it << std::endl;
+ signals[*it] = false;
+ }
+ m_kuksaClient->subscribe(signals,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+ rc = true;
+ }
+ return rc;
+}
+
+// Private
+
+void ProxyService::HandleSignalChange(const std::string &path, const Datapoint &dp)
+{
+ if (g_config.verbose() > 1)
+ std::cout << "ProxyService::HandleSignalChange: Value received for " << path << std::endl;
+
+ m_signalUpdateHandlers.processSignalUpdate(path, dp);
+}
+
+void ProxyService::HandleSignalSetError(const std::string &path, const Error &error)
+{
+ std::cerr << "Error setting " << path << ": " << error.code() << " - " << error.reason() << std::endl;
+}
+
+void ProxyService::HandleSubscribeDone(const SubscribeRequest *request, const Status &status)
+{
+ if (g_config.verbose())
+ std::cout << "Subscribe status = " << status.error_code() <<
+ " (" << status.error_message() << ")" << std::endl;
+
+ if (status.error_code() == grpc::CANCELLED) {
+ if (g_config.verbose())
+ std::cerr << "Subscribe canceled, assuming shutdown" << std::endl;
+ return;
+ }
+
+ // Queue up a resubcribe via the GLib event loop callback
+ struct resubscribe_data *data = new (struct resubscribe_data);
+ if (!data) {
+ std::cerr << "Could not create resubcribe_data" << std::endl;
+ exit(1);
+ }
+ data->self = this;
+ // Need to copy request since the one we have been handed is from the
+ // finished subscribe and will be going away.
+ data->request = new SubscribeRequest(*request);
+ if (!data->request) {
+ std::cerr << "Could not create resubscribe SubscribeRequest" << std::endl;
+ exit(1);
+ }
+
+ // NOTE: Waiting 100 milliseconds for now; it is possible that some
+ // randomization and/or back-off may need to be added if many
+ // subscribes are active, or switching to some other resubscribe
+ // scheme altogether (e.g. post subscribes to a thread that waits
+ // for the channel to become connected again).
+ g_timeout_add_full(G_PRIORITY_DEFAULT,
+ 100,
+ resubscribe_cb,
+ data,
+ NULL);
+}
+
+void ProxyService::Resubscribe(const SubscribeRequest *request)
+{
+ if (!(m_kuksaClient && request))
+ return;
+
+ m_kuksaClient->subscribe(request,
+ [this](const std::string &path, const Datapoint &dp) {
+ HandleSignalChange(path, dp);
+ },
+ [this](const SubscribeRequest *request, const Status &s) {
+ HandleSubscribeDone(request, s);
+ });
+}
diff --git a/src/ProxyService.h b/src/ProxyService.h
new file mode 100644
index 0000000..8e2cf66
--- /dev/null
+++ b/src/ProxyService.h
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _PROXY_SERVICE_H
+#define _PROXY_SERVICE_H
+
+#include <mutex>
+#include <glib.h>
+
+#include "KuksaConfig.h"
+#include "KuksaClient.h"
+#include "MqttConfig.h"
+#include "MqttClient.h"
+#include "SignalUpdateHandlers.h"
+
+class ProxyService
+{
+public:
+ ProxyService(const KuksaConfig &config,
+ const MqttConfig &mqttConfig,
+ const SignalUpdateHandlers &signalUpdateHandlers,
+ GMainLoop *loop = NULL);
+
+ ~ProxyService();
+
+ bool start();
+
+ // Callback for KuksaClient subscribe API reconnect
+ static gboolean resubscribe_cb(gpointer data) {
+ struct resubscribe_data *d = (struct resubscribe_data*) data;
+ if (d && d->self) {
+ ((ProxyService*) d->self)->Resubscribe(d->request);
+ }
+ return FALSE;
+ }
+
+private:
+ struct resubscribe_data {
+ ProxyService *self;
+ const SubscribeRequest *request;
+ };
+
+ GMainLoop *m_loop;
+ KuksaConfig m_kuksaConfig;
+ KuksaClient *m_kuksaClient;
+ MqttConfig m_mqttConfig;
+ MqttClient *m_mqttClient;
+ SignalUpdateHandlers m_signalUpdateHandlers;
+
+ void HandleSignalChange(const std::string &path, const Datapoint &dp);
+
+ void HandleSignalSetError(const std::string &path, const Error &error);
+
+ void HandleSubscribeDone(const SubscribeRequest *request, const Status &status);
+
+ void Resubscribe(const SubscribeRequest *request);
+};
+
+#endif // _PROXY_SERVICE_H
diff --git a/src/SignalUpdateHandler.cpp b/src/SignalUpdateHandler.cpp
new file mode 100644
index 0000000..6f03c85
--- /dev/null
+++ b/src/SignalUpdateHandler.cpp
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <iostream>
+
+#include "SignalUpdateHandler.h"
+#include "vss-notification.pb.h"
+#include "GlobalConfig.h"
+
+void PassthroughSignalUpdateHandler::handleUpdate(const Datapoint &dp)
+{
+ if (!m_mqttClient)
+ return;
+
+ if (!m_mqttClient->connected()) {
+ if (g_config.verbose() > 1)
+ std::cerr << "Not connected, dropping signal update" << std::endl;
+ return;
+ }
+
+ agl::SignalUpdateNotification notification;
+ notification.set_clientid(g_config.clientId());
+ auto entry = notification.add_signals();
+ entry->set_path(m_path);
+ if (dp.has_timestamp()) {
+ auto timestamp = new google::protobuf::Timestamp(dp.timestamp());
+ entry->set_allocated_timestamp(timestamp);
+ }
+
+ bool handled = true;
+ if (dp.has_int32()) {
+ entry->set_int32(dp.int32());
+ } else if (dp.has_uint32()) {
+ entry->set_uint32(dp.uint32());
+ } else if (dp.has_int64()) {
+ entry->set_int64(dp.int64());
+ } else if (dp.has_uint64()) {
+ entry->set_uint64(dp.uint64());
+ } else if (dp.has_bool_()) {
+ entry->set_bool_(dp.bool_());
+ } else if (dp.has_float_()) {
+ entry->set_float_(dp.float_());
+ } else if (dp.has_double_()) {
+ entry->set_double_(dp.double_());
+ } else if (dp.has_string()) {
+ entry->set_string(dp.string());
+ } else {
+ // Array value types not handled at present
+ handled = false;
+ }
+
+ if (handled) {
+ if (g_config.verbose() > 1)
+ std::cout << "Passing though signal " << m_path << " update" << std::endl;
+ std::string bytes = notification.SerializeAsString();
+ const char* array = bytes.data();
+ int size = bytes.size();
+ int rc = m_mqttClient->publish(array, size);
+ if (g_config.verbose() > 1)
+ std::cout << "MQTT publish returned " << rc << std::endl;
+ }
+}
diff --git a/src/SignalUpdateHandler.h b/src/SignalUpdateHandler.h
new file mode 100644
index 0000000..c597e7f
--- /dev/null
+++ b/src/SignalUpdateHandler.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _SIGNAL_UPDATE_HANDLER_H
+#define _SIGNAL_UPDATE_HANDLER_H
+
+#include <string>
+
+#include "MqttClient.h"
+#include "kuksa/val/v1/val.grpc.pb.h"
+
+// Just pull in the whole namespace since Datapoint contains a lot of
+// definitions that may potentially be needed.
+using namespace kuksa::val::v1;
+
+class SignalUpdateHandler
+{
+public:
+ SignalUpdateHandler(const std::string &path) : m_path(path) {};
+ virtual ~SignalUpdateHandler() {};
+
+ void setClient(MqttClient *mqttClient) { m_mqttClient = mqttClient; };
+ virtual void handleUpdate(const Datapoint &dp) = 0;
+
+protected:
+ std::string m_path;
+ MqttClient *m_mqttClient = nullptr;
+};
+
+class PassthroughSignalUpdateHandler : public SignalUpdateHandler
+{
+public:
+ PassthroughSignalUpdateHandler(const std::string &path) : SignalUpdateHandler(path) {};
+ ~PassthroughSignalUpdateHandler() {};
+
+ void handleUpdate(const Datapoint &dp) override;
+};
+
+#endif // _SIGNAL_UPDATE_HANDLER_H
diff --git a/src/SignalUpdateHandlers.cpp b/src/SignalUpdateHandlers.cpp
new file mode 100644
index 0000000..34aab04
--- /dev/null
+++ b/src/SignalUpdateHandlers.cpp
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include "SignalUpdateHandlers.h"
+
+void SignalUpdateHandlers::addSignalUpdateHandler(const std::string &path, std::shared_ptr<SignalUpdateHandler> &handler)
+{
+ auto it = m_handlers.find(path);
+ if (it == m_handlers.end()) {
+ std::vector<std::shared_ptr<SignalUpdateHandler>> v;
+ v.push_back(handler);
+ m_handlers[path] = v;
+ } else {
+ m_handlers[path].push_back(handler);
+ }
+}
+
+std::vector<std::string> SignalUpdateHandlers::getSignals()
+{
+ std::vector<std::string> signals;
+ for (auto it = m_handlers.begin(); it != m_handlers.end(); ++it) {
+ signals.push_back(it->first);
+ }
+ return signals;
+}
+
+void SignalUpdateHandlers::setClient(MqttClient *mqttClient)
+{
+ for (auto it = m_handlers.begin(); it != m_handlers.end(); ++it) {
+ for (auto it2 = it->second.begin(); it2 != it->second.end(); ++it2) {
+ std::shared_ptr<SignalUpdateHandler> handler = *it2;
+ if (handler)
+ handler->setClient(mqttClient);
+ }
+ }
+}
+
+void SignalUpdateHandlers::processSignalUpdate(const std::string &path, const Datapoint &dp)
+{
+ auto it = m_handlers.find(path);
+ if (it == m_handlers.end())
+ return;
+
+ for (auto it2 = m_handlers[path].begin();
+ it2 != m_handlers[path].end();
+ ++it2) {
+ std::shared_ptr<SignalUpdateHandler> handler = *it2;
+ if (handler)
+ handler->handleUpdate(dp);
+ }
+}
diff --git a/src/SignalUpdateHandlers.h b/src/SignalUpdateHandlers.h
new file mode 100644
index 0000000..0bae826
--- /dev/null
+++ b/src/SignalUpdateHandlers.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2024 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#ifndef _SIGNAL_UPDATE_HANDLERS_H
+#define _SIGNAL_UPDATE_HANDLERS_H
+
+#include <string>
+#include <map>
+#include <vector>
+
+#include "kuksa/val/v1/val.grpc.pb.h"
+#include "SignalUpdateHandler.h"
+
+// Just pull in the whole namespace since Datapoint contains a lot of
+// definitions that may potentially be needed.
+using namespace kuksa::val::v1;
+
+class SignalUpdateHandlers
+{
+public:
+ SignalUpdateHandlers() {};
+ ~SignalUpdateHandlers() {};
+
+ void addSignalUpdateHandler(const std::string &path, std::shared_ptr<SignalUpdateHandler> &handler);
+ std::vector<std::string> getSignals();
+ void setClient(MqttClient *mqttClient);
+ void processSignalUpdate(const std::string &path, const Datapoint &dp);
+
+private:
+ std::map<std::string, std::vector<std::shared_ptr<SignalUpdateHandler>>> m_handlers;
+};
+
+#endif // _SIGNAL_UPDATE_HANDLERS_H
diff --git a/src/config.yaml.example b/src/config.yaml.example
new file mode 100644
index 0000000..c25bd79
--- /dev/null
+++ b/src/config.yaml.example
@@ -0,0 +1,90 @@
+#
+# Global options
+#
+# Proxy client ID for MQTT payloads
+#client-id:
+#
+# Logging verbosity [0-2]
+#verbose: 0
+
+#
+# KUKSA.val options
+#
+#kuksa:
+# Hostname of databroker server
+# hostname: localhost
+#
+# Databroker port number
+# port: 55555
+#
+# Authorization token
+# authorization:
+#
+# TLS support
+# use-tls: false
+#
+# TLS CA certificate file
+# ca-certificate:
+#
+# Optional TLS server name override
+# tls-server-name:
+
+#
+# MQTT options
+#
+#mqtt:
+# MQTT broker server hostname
+# hostname: localhost
+#
+# MQTT broker port (defaults to 8883 if TLS is enabled)
+# port: 1883
+#
+# MQTT connection keepalive setting (seconds)
+# keepalive: 60
+#
+# Optional MQTT broker username
+# username:
+#
+# Optional MQTT broker password
+# password:
+#
+# MQTT connection client ID
+# client-id:
+#
+# MQTT clean on disconnect setting
+# clean-on-disconnect: false
+#
+# MQTT message topic
+# topic:
+#
+# MQTT message Quality of Service (QoS), 0-2
+# qos: 1
+#
+# MQTT message retain setting
+# retain: true
+#
+# TLS support
+# use-tls: false
+#
+# TLS CA certificate file
+# ca-certificate:
+#
+# TLS server hostname verification switch
+# verify-server-hostname: true
+#
+# TLS client certificate file (required if key is configured)
+# client-certificate:
+#
+# TLS client key (required if certificate is configured)
+# client-key:
+#
+
+#
+# VSS signal configuration
+#
+signals:
+- signal: Vehicle.Speed
+# Not yet implemented:
+# condition:
+# period:
+# capture:
diff --git a/src/config.yaml.minimal b/src/config.yaml.minimal
new file mode 100644
index 0000000..ec96aea
--- /dev/null
+++ b/src/config.yaml.minimal
@@ -0,0 +1,2 @@
+signals:
+- signal: Vehicle.Speed
diff --git a/src/config.yaml.test b/src/config.yaml.test
new file mode 100644
index 0000000..b456453
--- /dev/null
+++ b/src/config.yaml.test
@@ -0,0 +1,13 @@
+kuksa:
+ authorization: /etc/agl-vss-proxy/agl-vss-proxy.token
+ use-tls: true
+mqtt:
+ hostname:
+ port:
+ #username:
+ #password:
+ #use-tls:
+ #ca-certificate:
+ #verify-server-hostname:
+signals:
+- signal: Vehicle.Speed
diff --git a/src/main.cpp b/src/main.cpp
new file mode 100644
index 0000000..228690e
--- /dev/null
+++ b/src/main.cpp
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2022,2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <getopt.h>
+#include <glib.h>
+#include <glib-unix.h>
+
+#include "ParseConfig.h"
+#include "ProxyService.h"
+
+#define DEFAULT_CONFIG "/etc/agl-vss-proxy/config.yaml"
+
+GlobalConfig g_config;
+
+static gboolean quit_cb(gpointer user_data)
+{
+ GMainLoop *loop = (GMainLoop*) user_data;
+
+ g_info("Quitting...");
+
+ if (loop)
+ g_idle_add(G_SOURCE_FUNC(g_main_loop_quit), loop);
+ else
+ exit(0);
+
+ return G_SOURCE_REMOVE;
+}
+
+int main(int argc, char **argv)
+{
+ GMainLoop *loop = NULL;
+
+ loop = g_main_loop_new(NULL, FALSE);
+ if (!loop) {
+ std::cerr << "Could not create GLib event loop" << std::endl;
+ exit(2);
+ }
+
+ std::string configFile = DEFAULT_CONFIG;
+ unsigned verbose = 0;
+ struct option options[] = {
+ {"config", required_argument, 0, 'c' },
+ {"verbose", no_argument, 0, 'v' },
+ {0, 0, 0, 0 }
+ };
+ int opt = 0;
+ while ((opt = getopt_long(argc, argv,"c:v", options, NULL)) != -1) {
+ switch (opt) {
+ case 'c':
+ configFile = optarg;
+ break;
+ case 'v':
+ verbose++;
+ break;
+ default:
+ std::cerr << "Invalid option" << std::endl;
+ exit(1);
+ break;
+ }
+ }
+
+ KuksaConfig kuksaConfig;
+ MqttConfig mqttConfig;
+ SignalUpdateHandlers signalUpdateHandlers;
+ if (!ParseConfig(configFile, g_config, kuksaConfig, mqttConfig, signalUpdateHandlers)) {
+ std::cerr << "Could not parse configuration" << std::endl;
+ exit(1);
+ }
+ if (verbose > 0)
+ g_config.setVerbose(verbose);
+
+ g_unix_signal_add(SIGTERM, quit_cb, (gpointer) loop);
+ g_unix_signal_add(SIGINT, quit_cb, (gpointer) loop);
+
+ ProxyService service(kuksaConfig, mqttConfig, signalUpdateHandlers, loop);
+ if (service.start()) {
+ g_main_loop_run(loop);
+ }
+
+ // Clean up
+ g_main_loop_unref(loop);
+
+ return 0;
+}
diff --git a/src/meson.build b/src/meson.build
new file mode 100644
index 0000000..fce3bb3
--- /dev/null
+++ b/src/meson.build
@@ -0,0 +1,67 @@
+cpp = meson.get_compiler('cpp')
+grpcpp_reflection_dep = cpp.find_library('grpc++_reflection')
+
+proxy_dep = [
+ dependency('glib-2.0'),
+ dependency('openssl'),
+ dependency('threads'),
+ dependency('yaml-cpp'),
+ dependency('libmosquitto'),
+ dependency('protobuf'),
+ dependency('grpc'),
+ dependency('grpc++'),
+ grpcpp_reflection_dep
+]
+
+protoc = find_program('protoc')
+grpc_cpp = find_program('grpc_cpp_plugin')
+
+kuksa_protos_base_dir = get_option('protos')
+kuksa_protos_dir = kuksa_protos_base_dir / 'kuksa/val/v1'
+kuksa_protoc_gen = generator(protoc, \
+ output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'],
+ arguments : ['-I=' + kuksa_protos_base_dir,
+ '--cpp_out=@BUILD_DIR@',
+ '@INPUT@'])
+
+protoc_gen = generator(protoc, \
+ output : ['@BASENAME@.pb.cc', '@BASENAME@.pb.h'],
+ arguments : ['--proto_path=@CURRENT_SOURCE_DIR@/../protos',
+ '--cpp_out=@BUILD_DIR@',
+ '@INPUT@'])
+
+generated_protoc_sources = [ \
+ kuksa_protoc_gen.process(kuksa_protos_dir / 'types.proto', preserve_path_from : kuksa_protos_base_dir),
+ kuksa_protoc_gen.process(kuksa_protos_dir / 'val.proto', preserve_path_from : kuksa_protos_base_dir),
+ protoc_gen.process('../protos/vss-notification.proto'),
+]
+
+kuksa_grpc_gen = generator(protoc, \
+ output : ['@BASENAME@.grpc.pb.cc', '@BASENAME@.grpc.pb.h'],
+ arguments : ['-I=' + kuksa_protos_base_dir,
+ '--grpc_out=@BUILD_DIR@',
+ '--plugin=protoc-gen-grpc=' + grpc_cpp.path(),
+ '@INPUT@'])
+generated_grpc_sources = [ \
+ kuksa_grpc_gen.process(kuksa_protos_dir / 'val.proto', preserve_path_from : kuksa_protos_base_dir),
+]
+
+src = [
+ 'KuksaConfig.cpp',
+ 'KuksaClient.cpp',
+ 'MqttConfig.cpp',
+ 'MqttClient.cpp',
+ 'SignalUpdateHandler.cpp',
+ 'SignalUpdateHandlers.cpp',
+ 'ParseConfig.cpp',
+ 'ProxyService.cpp',
+ 'main.cpp',
+ generated_protoc_sources,
+ generated_grpc_sources,
+]
+
+executable('agl-vss-proxy',
+ src,
+ dependencies: proxy_dep,
+ install: true,
+ install_dir : get_option('sbindir'))
diff --git a/systemd/agl-vss-proxy.service b/systemd/agl-vss-proxy.service
new file mode 100644
index 0000000..1839543
--- /dev/null
+++ b/systemd/agl-vss-proxy.service
@@ -0,0 +1,11 @@
+[Unit]
+Requires=kuksa-databroker.service
+After=kuksa-databroker.service
+
+[Service]
+Type=simple
+ExecStart=/usr/sbin/agl-vss-proxy
+Restart=on-failure
+
+[Install]
+WantedBy=default.target
diff --git a/systemd/meson.build b/systemd/meson.build
new file mode 100644
index 0000000..e299741
--- /dev/null
+++ b/systemd/meson.build
@@ -0,0 +1,5 @@
+systemd_dep = dependency('systemd')
+
+systemd_system_unit_dir = systemd_dep.get_pkgconfig_variable('systemdsystemunitdir')
+
+install_data('agl-vss-proxy.service', install_dir : systemd_system_unit_dir)