summaryrefslogtreecommitdiffstats
path: root/src/KuksaClient.cpp
diff options
context:
space:
mode:
authorScott Murray <scott.murray@konsulko.com>2023-08-24 15:39:24 -0400
committerScott Murray <scott.murray@konsulko.com>2023-08-24 15:42:30 -0400
commit0a1426d097688912188bcb59ff59d9c596e82b4d (patch)
tree8032edef0f8a6c3bbebe8f4382486f679bb2143f /src/KuksaClient.cpp
parentf0ac80936b73a44131564c4f65ecc0c9a9db7d39 (diff)
Rework to switch to using KUKSA.val databroker
Rework to use the "VAL" gRPC API from the KUKSA.val databroker instead of the older server's WebSocket interface. Some source files have been renamed to match the class naming to provide a bit more consistency. Bug-AGL: SPEC-4762 Signed-off-by: Scott Murray <scott.murray@konsulko.com> Change-Id: Ib1ec31af439a9b2d5244e2232ea7be1ed9a2574c
Diffstat (limited to 'src/KuksaClient.cpp')
-rw-r--r--src/KuksaClient.cpp373
1 files changed, 373 insertions, 0 deletions
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp
new file mode 100644
index 0000000..844ce0e
--- /dev/null
+++ b/src/KuksaClient.cpp
@@ -0,0 +1,373 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <regex>
+#include <iterator>
+#include <mutex>
+
+#include "KuksaClient.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::ClientReader;
+using grpc::Status;
+
+KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) :
+ m_config(config)
+{
+ m_stub = VAL::NewStub(channel);
+}
+
+void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ GetRequest request;
+ auto entry = request.add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ GetResponse *response = new GetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create GetResponse");
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Get(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleGetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+// Since a set request needs a Datapoint with the appropriate type value,
+// checking the signal metadata to get the type would be a requirement for
+// a generic set call that takes a string as argument. For now, assume
+// that set with a string is specifically for a signal of string type.
+
+void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_string(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_float_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_double_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ auto entry = request->add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ for(auto it = signals.cbegin(); it != signals.cend(); ++it) {
+ auto entry = request->add_entries();
+ entry->set_path(it->first);
+ entry->add_fields(Field::FIELD_PATH);
+ if (it->second)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+ }
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ if (!(request && cb))
+ return;
+
+ class Reader : public grpc::ClientReadReactor<SubscribeResponse> {
+ public:
+ Reader(VAL::Stub *stub,
+ KuksaClient *client,
+ KuksaConfig &config,
+ const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb):
+ client_(client),
+ config_(config),
+ request_(request),
+ cb_(cb),
+ done_cb_(done_cb) {
+ std::string token = config_.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context_.AddMetadata(std::string("authorization"), token);
+ }
+ stub->async()->Subscribe(&context_, request, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (ok) {
+ if (client_)
+ client_->handleSubscribeResponse(&response_, cb_);
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ status_ = s;
+ if (client_) {
+ if (config_.verbose() > 1)
+ std::cerr << "KuksaClient::subscribe::Reader done" << std::endl;
+ client_->handleSubscribeDone(request_, status_, done_cb_);
+ }
+
+ // gRPC engine is done with us, safe to self-delete
+ delete request_;
+ delete this;
+ }
+
+ private:
+ KuksaClient *client_;
+ KuksaConfig config_;
+ const SubscribeRequest *request_;
+ SubscribeResponseCallback cb_;
+ SubscribeDoneCallback done_cb_;
+
+ ClientContext context_;
+ SubscribeResponse response_;
+ std::mutex mutex_;
+ Status status_;
+ };
+ Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb);
+ if (!reader)
+ handleCriticalFailure("Could not create Subscribe reader");
+}
+
+// Private
+
+void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ SetRequest request;
+ auto update = request.add_updates();
+ auto entry = update->mutable_entry();
+ entry->set_path(path);
+ if (actuator) {
+ auto target = entry->mutable_actuator_target();
+ *target = dp;
+ update->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ } else {
+ auto value = entry->mutable_value();
+ *value = dp;
+ update->add_fields(Field::FIELD_VALUE);
+ }
+
+ SetResponse *response = new SetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create SetResponse");
+ delete context;
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Set(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleSetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb)
+{
+ if (!(response && response->entries_size() && cb))
+ return;
+
+ for (auto it = response->entries().begin(); it != response->entries().end(); ++it) {
+ // We expect paths in the response entries
+ if (!it->path().size())
+ continue;
+
+ Datapoint dp;
+ if (it->has_actuator_target())
+ dp = it->actuator_target();
+ else
+ dp = it->value();
+ cb(it->path(), dp);
+ }
+}
+
+void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb)
+{
+ if (!(response && response->errors_size() && cb))
+ return;
+
+ for (auto it = response->errors().begin(); it != response->errors().end(); ++it) {
+ cb(it->path(), it->error());
+ }
+
+}
+
+void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb)
+{
+ if (!(response && response->updates_size() && cb))
+ return;
+
+ for (auto it = response->updates().begin(); it != response->updates().end(); ++it) {
+ // We expect entries that have paths in the response
+ if (!(it->has_entry() && it->entry().path().size()))
+ continue;
+
+ auto entry = it->entry();
+ if (m_config.verbose())
+ std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl;
+
+ Datapoint dp;
+ if (entry.has_actuator_target())
+ dp = entry.actuator_target();
+ else
+ dp = entry.value();
+
+ cb(entry.path(), dp);
+ }
+}
+
+void KuksaClient::handleSubscribeDone(const SubscribeRequest *request,
+ const Status &status,
+ SubscribeDoneCallback cb)
+{
+ if (cb)
+ cb(request, status);
+}
+
+void KuksaClient::handleCriticalFailure(const std::string &error)
+{
+ if (error.size())
+ std::cerr << error << std::endl;
+ exit(1);
+}
+