diff options
author | Scott Murray <scott.murray@konsulko.com> | 2023-08-24 15:43:08 -0400 |
---|---|---|
committer | Scott Murray <scott.murray@konsulko.com> | 2023-08-24 15:43:28 -0400 |
commit | 82c1c0ab04219f9453f1b3a14a9754068e360583 (patch) | |
tree | c8ad28a5b7deba660dbddc7de86109d998eaf2e8 /src/KuksaClient.cpp | |
parent | fdd9d0964a0fe7aadfcef33c9e9c1f183ca10820 (diff) |
Rework to switch to using KUKSA.val databroker
Rework to use the "VAL" gRPC API from the KUKSA.val databroker
instead of the older server's WebSocket interface. Some source
files have been renamed to match the class naming to provide
a bit more consistency.
Bug-AGL: SPEC-4762
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Change-Id: I5ded74cfbd6987cd045b7b142fd9f38971aaef66
Diffstat (limited to 'src/KuksaClient.cpp')
-rw-r--r-- | src/KuksaClient.cpp | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/src/KuksaClient.cpp b/src/KuksaClient.cpp new file mode 100644 index 0000000..844ce0e --- /dev/null +++ b/src/KuksaClient.cpp @@ -0,0 +1,373 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <string> +#include <regex> +#include <iterator> +#include <mutex> + +#include "KuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +KuksaClient::KuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const KuksaConfig &config) : + m_config(config) +{ + m_stub = VAL::NewStub(channel); +} + +void KuksaClient::get(const std::string &path, GetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleGetResponse(response, cb); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void KuksaClient::set(const std::string &path, const std::string &value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_string(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const int64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint8_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint16_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint32_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const uint64_t value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const float value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::set(const std::string &path, const double value, SetResponseCallback cb, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, cb, actuator); +} + +void KuksaClient::subscribe(const std::string &path, + SubscribeResponseCallback cb, + const bool actuator, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto entry = request->add_entries(); + entry->set_path(path); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const std::map<std::string, bool> signals, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + for(auto it = signals.cbegin(); it != signals.cend(); ++it) { + auto entry = request->add_entries(); + entry->set_path(it->first); + entry->add_fields(Field::FIELD_PATH); + if (it->second) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + } + + subscribe(request, cb, done_cb); +} + +void KuksaClient::subscribe(const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb) +{ + if (!(request && cb)) + return; + + class Reader : public grpc::ClientReadReactor<SubscribeResponse> { + public: + Reader(VAL::Stub *stub, + KuksaClient *client, + KuksaConfig &config, + const SubscribeRequest *request, + SubscribeResponseCallback cb, + SubscribeDoneCallback done_cb): + client_(client), + config_(config), + request_(request), + cb_(cb), + done_cb_(done_cb) { + std::string token = config_.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context_.AddMetadata(std::string("authorization"), token); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock<std::mutex> lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_, cb_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + std::cerr << "KuksaClient::subscribe::Reader done" << std::endl; + client_->handleSubscribeDone(request_, status_, done_cb_); + } + + // gRPC engine is done with us, safe to self-delete + delete request_; + delete this; + } + + private: + KuksaClient *client_; + KuksaConfig config_; + const SubscribeRequest *request_; + SubscribeResponseCallback cb_; + SubscribeDoneCallback done_cb_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; + }; + Reader *reader = new Reader(m_stub.get(), this, m_config, request, cb, done_cb); + if (!reader) + handleCriticalFailure("Could not create Subscribe reader"); +} + +// Private + +void KuksaClient::set(const std::string &path, const Datapoint &dp, SetResponseCallback cb, const bool actuator) +{ + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + std::string token = m_config.authToken(); + if (!token.empty()) { + token.insert(0, std::string("Bearer ")); + context->AddMetadata(std::string("authorization"), token); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, cb, context, response](Status s) { + if (s.ok()) + handleSetResponse(response, cb); + delete response; + delete context; + }); +} + +void KuksaClient::handleGetResponse(const GetResponse *response, GetResponseCallback cb) +{ + if (!(response && response->entries_size() && cb)) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + cb(it->path(), dp); + } +} + +void KuksaClient::handleSetResponse(const SetResponse *response, SetResponseCallback cb) +{ + if (!(response && response->errors_size() && cb)) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + cb(it->path(), it->error()); + } + +} + +void KuksaClient::handleSubscribeResponse(const SubscribeResponse *response, SubscribeResponseCallback cb) +{ + if (!(response && response->updates_size() && cb)) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + if (m_config.verbose()) + std::cout << "KuksaClient::handleSubscribeResponse: got value for " << entry.path() << std::endl; + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + cb(entry.path(), dp); + } +} + +void KuksaClient::handleSubscribeDone(const SubscribeRequest *request, + const Status &status, + SubscribeDoneCallback cb) +{ + if (cb) + cb(request, status); +} + +void KuksaClient::handleCriticalFailure(const std::string &error) +{ + if (error.size()) + std::cerr << error << std::endl; + exit(1); +} + |