diff options
Diffstat (limited to 'vehicle-signals/QtKuksaClient.cpp')
-rw-r--r-- | vehicle-signals/QtKuksaClient.cpp | 421 |
1 files changed, 421 insertions, 0 deletions
diff --git a/vehicle-signals/QtKuksaClient.cpp b/vehicle-signals/QtKuksaClient.cpp new file mode 100644 index 0000000..901459d --- /dev/null +++ b/vehicle-signals/QtKuksaClient.cpp @@ -0,0 +1,421 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <QDebug> +#include <QSettings> +#include <QUrl> +#include <QFile> +#include <QtConcurrent> +#include <mutex> +#include <chrono> + +#include "QtKuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +class QtKuksaClient::SubscribeReader : public grpc::ClientReadReactor<SubscribeResponse> { +public: + SubscribeReader(VAL::Stub *stub, + QtKuksaClient *client, + VehicleSignalsConfig &config, + const QMap<QString, bool> &s, + const SubscribeRequest *request): + client_(client), + config_(config), + signals_(s) { + QString token = config_.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context_.AddMetadata(std::string("authorization"), token.toStdString()); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock<std::mutex> lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + qDebug() << "QtKuksaClient::subscribe::Reader done"; + client_->handleSubscribeDone(signals_, status_); + } + + // gRPC engine is done with us, safe to self-delete + delete this; + } + +private: + QtKuksaClient *client_; + VehicleSignalsConfig config_; + QMap<QString, bool> signals_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; +}; + +QtKuksaClient::QtKuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, + const VehicleSignalsConfig &config, + QObject *parent) : + QObject(parent), + m_channel(channel), + m_config(config), + m_connected(false) +{ + m_stub = VAL::NewStub(channel); + +} + +void QtKuksaClient::connect() +{ + // Check for connection in another thread + QFuture<void> future = QtConcurrent::run(this, &QtKuksaClient::waitForConnected); +} + +void QtKuksaClient::get(const QString &path, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + QString token = m_config.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context->AddMetadata(std::string("authorization"), token.toStdString()); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path.toStdString()); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, context, response](Status s) { + if (s.ok()) + handleGetResponse(response); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void QtKuksaClient::set(const QString &path, const QString &value, const bool actuator) +{ + Datapoint dp; + dp.set_string(value.toStdString()); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const qint32 value, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const qint64 value, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const quint32 value, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const quint64 value, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const float value, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const double value, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, actuator); +} + +void QtKuksaClient::subscribe(const QString &path, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + QMap<QString, bool> s; + s[path] = actuator; + subscribe(s); +} + +void QtKuksaClient::subscribe(const QMap<QString, bool> &signals_) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto it = signals_.constBegin(); + while (it != signals_.constEnd()) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::subscribe: adding " << it.key() << ", actuator " << it.value(); + auto entry = request->add_entries(); + entry->set_path(it.key().toStdString()); + entry->add_fields(Field::FIELD_PATH); + if (it.value()) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + ++it; + } + + SubscribeReader *reader = new SubscribeReader(m_stub.get(), this, m_config, signals_, request); + if (!reader) + handleCriticalFailure("Could not create SubscribeReader"); + + delete request; +} + +// Private + +void QtKuksaClient::waitForConnected() +{ + while (!m_channel->WaitForConnected(std::chrono::system_clock::now() + + std::chrono::milliseconds(500))); + + m_connected_mutex.lock(); + m_connected = true; + m_connected_mutex.unlock(); + + qInfo() << "Databroker gRPC channel ready"; + emit connected(); +} + +void QtKuksaClient::set(const QString &path, const Datapoint &dp, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + QString token = m_config.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context->AddMetadata(std::string("authorization"), token.toStdString()); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path.toStdString()); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, context, response](Status s) { + if (s.ok()) + handleSetResponse(response); + delete response; + delete context; + }); +} + +void QtKuksaClient::handleGetResponse(const GetResponse *response) +{ + if (!(response && response->entries_size())) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + + QString path = QString::fromStdString(it->path()); + QString value; + QString timestamp; + if (convertDatapoint(dp, value, timestamp)) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::handleGetResponse: path = " << path << ", value = " << value; + + emit getResponse(path, value, timestamp); + } + } +} + +void QtKuksaClient::handleSetResponse(const SetResponse *response) +{ + if (!(response && response->errors_size())) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + QString path = QString::fromStdString(it->path()); + QString error = QString::fromStdString(it->error().reason()); + emit setResponse(path, error); + } + +} + +void QtKuksaClient::handleSubscribeResponse(const SubscribeResponse *response) +{ + if (!(response && response->updates_size())) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + QString path = QString::fromStdString(entry.path()); + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + QString value; + QString timestamp; + if (convertDatapoint(dp, value, timestamp)) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::handleSubscribeResponse: path = " << path << ", value = " << value; + + emit subscribeResponse(path, value, timestamp); + } + } +} + +void QtKuksaClient::handleSubscribeDone(const QMap<QString, bool> &signals_, const Status &status) +{ + qInfo() << "QtKuksaClient::handleSubscribeDone: enter"; + if (m_config.verbose() > 1) + qDebug() << "Subscribe status = " << status.error_code() << + " (" << QString::fromStdString(status.error_message()) << ")"; + + emit subscribeDone(signals_, status.error_code() == grpc::CANCELLED); +} + +bool QtKuksaClient::convertDatapoint(const Datapoint &dp, QString &value, QString ×tamp) +{ + // NOTE: Currently no array type support + + if (dp.has_string()) + value = QString::fromStdString(dp.string()); + else if (dp.has_int32()) + value = QString::number(dp.int32()); + else if (dp.has_int64()) + value = QString::number(dp.int64()); + else if (dp.has_uint32()) + value = QString::number(dp.uint32()); + else if (dp.has_uint64()) + value = QString::number(dp.uint64()); + else if (dp.has_float_()) + value = QString::number(dp.float_(), 'f', 6); + else if (dp.has_double_()) + value = QString::number(dp.double_(), 'f', 6); + else if (dp.has_bool_()) + value = dp.bool_() ? "true" : "false"; + else { + if (m_config.verbose()) + qWarning() << "Malformed response (unsupported value type)"; + return false; + } + + timestamp = QString::number(dp.timestamp().seconds()) + "." + QString::number(dp.timestamp().nanos()); + return true; +} + +void QtKuksaClient::handleCriticalFailure(const QString &error) +{ + if (error.size()) + qCritical() << error; +} |