summaryrefslogtreecommitdiffstats
path: root/src/KuksaClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/KuksaClient.cpp')
-rw-r--r--src/KuksaClient.cpp373
1 files changed, 373 insertions, 0 deletions
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp
new file mode 100644
index 0000000..844ce0e
--- /dev/null
+++ b/src/KuksaClient.cpp
@@ -0,0 +1,373 @@
+/*
+ * Copyright (C) 2023 Konsulko Group
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include <string>
+#include <regex>
+#include <iterator>
+#include <mutex>
+
+#include "KuksaClient.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::ClientReader;
+using grpc::Status;
+
+KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) :
+ m_config(config)
+{
+ m_stub = VAL::NewStub(channel);
+}
+
+void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ GetRequest request;
+ auto entry = request.add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ GetResponse *response = new GetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create GetResponse");
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Get(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleGetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+// Since a set request needs a Datapoint with the appropriate type value,
+// checking the signal metadata to get the type would be a requirement for
+// a generic set call that takes a string as argument. For now, assume
+// that set with a string is specifically for a signal of string type.
+
+void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_string(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_int64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint32(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_uint64(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_float_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator)
+{
+ Datapoint dp;
+ dp.set_double_(value);
+ set(path, dp, cb, actuator);
+}
+
+void KuksaClient::subscribe(const std::string &path,
+ SubscribeResponseCallback cb,
+ const bool actuator,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ auto entry = request->add_entries();
+ entry->set_path(path);
+ entry->add_fields(Field::FIELD_PATH);
+ if (actuator)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const std::map<std::string, bool> signals,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ SubscribeRequest *request = new SubscribeRequest();
+ if (!request) {
+ handleCriticalFailure("Could not create SubscribeRequest");
+ return;
+ }
+
+ for(auto it = signals.cbegin(); it != signals.cend(); ++it) {
+ auto entry = request->add_entries();
+ entry->set_path(it->first);
+ entry->add_fields(Field::FIELD_PATH);
+ if (it->second)
+ entry->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ else
+ entry->add_fields(Field::FIELD_VALUE);
+ }
+
+ subscribe(request, cb, done_cb);
+}
+
+void KuksaClient::subscribe(const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb)
+{
+ if (!(request && cb))
+ return;
+
+ class Reader : public grpc::ClientReadReactor<SubscribeResponse> {
+ public:
+ Reader(VAL::Stub *stub,
+ KuksaClient *client,
+ KuksaConfig &config,
+ const SubscribeRequest *request,
+ SubscribeResponseCallback cb,
+ SubscribeDoneCallback done_cb):
+ client_(client),
+ config_(config),
+ request_(request),
+ cb_(cb),
+ done_cb_(done_cb) {
+ std::string token = config_.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context_.AddMetadata(std::string("authorization"), token);
+ }
+ stub->async()->Subscribe(&context_, request, this);
+ StartRead(&response_);
+ StartCall();
+ }
+ void OnReadDone(bool ok) override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (ok) {
+ if (client_)
+ client_->handleSubscribeResponse(&response_, cb_);
+ StartRead(&response_);
+ }
+ }
+ void OnDone(const Status& s) override {
+ status_ = s;
+ if (client_) {
+ if (config_.verbose() > 1)
+ std::cerr << "KuksaClient::subscribe::Reader done" << std::endl;
+ client_->handleSubscribeDone(request_, status_, done_cb_);
+ }
+
+ // gRPC engine is done with us, safe to self-delete
+ delete request_;
+ delete this;
+ }
+
+ private:
+ KuksaClient *client_;
+ KuksaConfig config_;
+ const SubscribeRequest *request_;
+ SubscribeResponseCallback cb_;
+ SubscribeDoneCallback done_cb_;
+
+ ClientContext context_;
+ SubscribeResponse response_;
+ std::mutex mutex_;
+ Status status_;
+ };
+ Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb);
+ if (!reader)
+ handleCriticalFailure("Could not create Subscribe reader");
+}
+
+// Private
+
+void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator)
+{
+ ClientContext *context = new ClientContext();
+ if (!context) {
+ handleCriticalFailure("Could not create ClientContext");
+ return;
+ }
+ std::string token = m_config.authToken();
+ if (!token.empty()) {
+ token.insert(0, std::string("Bearer "));
+ context->AddMetadata(std::string("authorization"), token);
+ }
+
+ SetRequest request;
+ auto update = request.add_updates();
+ auto entry = update->mutable_entry();
+ entry->set_path(path);
+ if (actuator) {
+ auto target = entry->mutable_actuator_target();
+ *target = dp;
+ update->add_fields(Field::FIELD_ACTUATOR_TARGET);
+ } else {
+ auto value = entry->mutable_value();
+ *value = dp;
+ update->add_fields(Field::FIELD_VALUE);
+ }
+
+ SetResponse *response = new SetResponse();
+ if (!response) {
+ handleCriticalFailure("Could not create SetResponse");
+ delete context;
+ return;
+ }
+
+ // NOTE: Using ClientUnaryReactor instead of the shortcut method
+ // would allow getting detailed errors.
+ m_stub->async()->Set(context, &request, response,
+ [this, cb, context, response](Status s) {
+ if (s.ok())
+ handleSetResponse(response, cb);
+ delete response;
+ delete context;
+ });
+}
+
+void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb)
+{
+ if (!(response && response->entries_size() && cb))
+ return;
+
+ for (auto it = response->entries().begin(); it != response->entries().end(); ++it) {
+ // We expect paths in the response entries
+ if (!it->path().size())
+ continue;
+
+ Datapoint dp;
+ if (it->has_actuator_target())
+ dp = it->actuator_target();
+ else
+ dp = it->value();
+ cb(it->path(), dp);
+ }
+}
+
+void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb)
+{
+ if (!(response && response->errors_size() && cb))
+ return;
+
+ for (auto it = response->errors().begin(); it != response->errors().end(); ++it) {
+ cb(it->path(), it->error());
+ }
+
+}
+
+void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb)
+{
+ if (!(response && response->updates_size() && cb))
+ return;
+
+ for (auto it = response->updates().begin(); it != response->updates().end(); ++it) {
+ // We expect entries that have paths in the response
+ if (!(it->has_entry() && it->entry().path().size()))
+ continue;
+
+ auto entry = it->entry();
+ if (m_config.verbose())
+ std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl;
+
+ Datapoint dp;
+ if (entry.has_actuator_target())
+ dp = entry.actuator_target();
+ else
+ dp = entry.value();
+
+ cb(entry.path(), dp);
+ }
+}
+
+void KuksaClient::handleSubscribeDone(const SubscribeRequest *request,
+ const Status &status,
+ SubscribeDoneCallback cb)
+{
+ if (cb)
+ cb(request, status);
+}
+
+void KuksaClient::handleCriticalFailure(const std::string &error)
+{
+ if (error.size())
+ std::cerr << error << std::endl;
+ exit(1);
+}
+