diff options
author | Scott Murray <scott.murray@konsulko.com> | 2023-08-24 15:21:40 -0400 |
---|---|---|
committer | Scott Murray <scott.murray@konsulko.com> | 2023-08-24 15:58:58 -0400 |
commit | e6e998428529bb788e2412e84757ad9a0b71fb32 (patch) | |
tree | 732447f581be177a0b181cb1de00c481b82bbda6 /vehicle-signals/QtKuksaClient.cpp | |
parent | 1234b2771bc45a885df54a779dfb8a125f315f93 (diff) |
Rework vehicle signals support to use KUKSA.val databroker
Rework the VehicleSignals class and its use in the Navigation and
Hvac classes to switch from using the original KUKSA.val server
via WebSockets to the KUKSA.val databroker's gRPC "VAL" API.
Notable changes:
- The VehicleSignals API has changed a bit with respect to setting
signals, callers now need to pass the new value as the type that
matches the signal as opposed to always passing a string, and
optionally indicate if an actuator's target or value is being set.
Subscribe operations now also allow subscribing for either
actuator targets or values.
- It is possible that the values returned by get and subscribe
operations will be changed to using QVariant instead of QStrings
in a future follow up, but that has not been done in these changes.
- The connected signal from VehicleSignals still has roughly the
same meaning, but the authorize function and authorized signals
are to some degree redundant now. They have been kept for
compatibility, but may be removed in a follow up set of changes.
- The section header in the .ini files expected by the
VehicleSignalsConfig class has been changed from "vis-client" to
"kuksa-client" since the databroker is not a VIS server, and to
some degree forcing an update on the part of clients is useful
since their authorization tokens also need to change.
- The client key and certificate support has been removed from the
VehicleSignalsConfig class, as they are no longer used in either
the server or databroker as of KUKSA.val 0.4.0. A new optional
parameter, "tls-server-name", has been added to work with the new
TLS support behavior. It can be used to override the expected
host name for connecting to a non-local databroker instance.
- The Navigation constructor now takes an additional parameter to
indicate whether the instance acts as a router or a client.
The underlying need for this stems from an application acting as
a router needing to subscribe to the destination setting actuator
targets.
Bug-AGL: SPEC-4762
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Change-Id: I253480ae2abf068dc6e41a495454960ed5c0feaf
Diffstat (limited to 'vehicle-signals/QtKuksaClient.cpp')
-rw-r--r-- | vehicle-signals/QtKuksaClient.cpp | 421 |
1 files changed, 421 insertions, 0 deletions
diff --git a/vehicle-signals/QtKuksaClient.cpp b/vehicle-signals/QtKuksaClient.cpp new file mode 100644 index 0000000..901459d --- /dev/null +++ b/vehicle-signals/QtKuksaClient.cpp @@ -0,0 +1,421 @@ +/* + * Copyright (C) 2023 Konsulko Group + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <QDebug> +#include <QSettings> +#include <QUrl> +#include <QFile> +#include <QtConcurrent> +#include <mutex> +#include <chrono> + +#include "QtKuksaClient.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::Status; + +class QtKuksaClient::SubscribeReader : public grpc::ClientReadReactor<SubscribeResponse> { +public: + SubscribeReader(VAL::Stub *stub, + QtKuksaClient *client, + VehicleSignalsConfig &config, + const QMap<QString, bool> &s, + const SubscribeRequest *request): + client_(client), + config_(config), + signals_(s) { + QString token = config_.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context_.AddMetadata(std::string("authorization"), token.toStdString()); + } + stub->async()->Subscribe(&context_, request, this); + StartRead(&response_); + StartCall(); + } + void OnReadDone(bool ok) override { + std::unique_lock<std::mutex> lock(mutex_); + if (ok) { + if (client_) + client_->handleSubscribeResponse(&response_); + StartRead(&response_); + } + } + void OnDone(const Status& s) override { + status_ = s; + if (client_) { + if (config_.verbose() > 1) + qDebug() << "QtKuksaClient::subscribe::Reader done"; + client_->handleSubscribeDone(signals_, status_); + } + + // gRPC engine is done with us, safe to self-delete + delete this; + } + +private: + QtKuksaClient *client_; + VehicleSignalsConfig config_; + QMap<QString, bool> signals_; + + ClientContext context_; + SubscribeResponse response_; + std::mutex mutex_; + Status status_; +}; + +QtKuksaClient::QtKuksaClient(const std::shared_ptr< ::grpc::ChannelInterface>& channel, + const VehicleSignalsConfig &config, + QObject *parent) : + QObject(parent), + m_channel(channel), + m_config(config), + m_connected(false) +{ + m_stub = VAL::NewStub(channel); + +} + +void QtKuksaClient::connect() +{ + // Check for connection in another thread + QFuture<void> future = QtConcurrent::run(this, &QtKuksaClient::waitForConnected); +} + +void QtKuksaClient::get(const QString &path, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + QString token = m_config.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context->AddMetadata(std::string("authorization"), token.toStdString()); + } + + GetRequest request; + auto entry = request.add_entries(); + entry->set_path(path.toStdString()); + entry->add_fields(Field::FIELD_PATH); + if (actuator) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + + GetResponse *response = new GetResponse(); + if (!response) { + handleCriticalFailure("Could not create GetResponse"); + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Get(context, &request, response, + [this, context, response](Status s) { + if (s.ok()) + handleGetResponse(response); + delete response; + delete context; + }); +} + +// Since a set request needs a Datapoint with the appropriate type value, +// checking the signal metadata to get the type would be a requirement for +// a generic set call that takes a string as argument. For now, assume +// that set with a string is specifically for a signal of string type. + +void QtKuksaClient::set(const QString &path, const QString &value, const bool actuator) +{ + Datapoint dp; + dp.set_string(value.toStdString()); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const qint32 value, const bool actuator) +{ + Datapoint dp; + dp.set_int32(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const qint64 value, const bool actuator) +{ + Datapoint dp; + dp.set_int64(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const quint32 value, const bool actuator) +{ + Datapoint dp; + dp.set_uint32(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const quint64 value, const bool actuator) +{ + Datapoint dp; + dp.set_uint64(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const float value, const bool actuator) +{ + Datapoint dp; + dp.set_float_(value); + set(path, dp, actuator); +} + +void QtKuksaClient::set(const QString &path, const double value, const bool actuator) +{ + Datapoint dp; + dp.set_double_(value); + set(path, dp, actuator); +} + +void QtKuksaClient::subscribe(const QString &path, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + QMap<QString, bool> s; + s[path] = actuator; + subscribe(s); +} + +void QtKuksaClient::subscribe(const QMap<QString, bool> &signals_) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + SubscribeRequest *request = new SubscribeRequest(); + if (!request) { + handleCriticalFailure("Could not create SubscribeRequest"); + return; + } + + auto it = signals_.constBegin(); + while (it != signals_.constEnd()) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::subscribe: adding " << it.key() << ", actuator " << it.value(); + auto entry = request->add_entries(); + entry->set_path(it.key().toStdString()); + entry->add_fields(Field::FIELD_PATH); + if (it.value()) + entry->add_fields(Field::FIELD_ACTUATOR_TARGET); + else + entry->add_fields(Field::FIELD_VALUE); + ++it; + } + + SubscribeReader *reader = new SubscribeReader(m_stub.get(), this, m_config, signals_, request); + if (!reader) + handleCriticalFailure("Could not create SubscribeReader"); + + delete request; +} + +// Private + +void QtKuksaClient::waitForConnected() +{ + while (!m_channel->WaitForConnected(std::chrono::system_clock::now() + + std::chrono::milliseconds(500))); + + m_connected_mutex.lock(); + m_connected = true; + m_connected_mutex.unlock(); + + qInfo() << "Databroker gRPC channel ready"; + emit connected(); +} + +void QtKuksaClient::set(const QString &path, const Datapoint &dp, const bool actuator) +{ + m_connected_mutex.lock(); + if (!m_connected) { + m_connected_mutex.unlock(); + return; + } + m_connected_mutex.unlock(); + + ClientContext *context = new ClientContext(); + if (!context) { + handleCriticalFailure("Could not create ClientContext"); + return; + } + QString token = m_config.authToken(); + if (!token.isEmpty()) { + token.prepend("Bearer "); + context->AddMetadata(std::string("authorization"), token.toStdString()); + } + + SetRequest request; + auto update = request.add_updates(); + auto entry = update->mutable_entry(); + entry->set_path(path.toStdString()); + if (actuator) { + auto target = entry->mutable_actuator_target(); + *target = dp; + update->add_fields(Field::FIELD_ACTUATOR_TARGET); + } else { + auto value = entry->mutable_value(); + *value = dp; + update->add_fields(Field::FIELD_VALUE); + } + + SetResponse *response = new SetResponse(); + if (!response) { + handleCriticalFailure("Could not create SetResponse"); + delete context; + return; + } + + // NOTE: Using ClientUnaryReactor instead of the shortcut method + // would allow getting detailed errors. + m_stub->async()->Set(context, &request, response, + [this, context, response](Status s) { + if (s.ok()) + handleSetResponse(response); + delete response; + delete context; + }); +} + +void QtKuksaClient::handleGetResponse(const GetResponse *response) +{ + if (!(response && response->entries_size())) + return; + + for (auto it = response->entries().begin(); it != response->entries().end(); ++it) { + // We expect paths in the response entries + if (!it->path().size()) + continue; + Datapoint dp; + if (it->has_actuator_target()) + dp = it->actuator_target(); + else + dp = it->value(); + + QString path = QString::fromStdString(it->path()); + QString value; + QString timestamp; + if (convertDatapoint(dp, value, timestamp)) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::handleGetResponse: path = " << path << ", value = " << value; + + emit getResponse(path, value, timestamp); + } + } +} + +void QtKuksaClient::handleSetResponse(const SetResponse *response) +{ + if (!(response && response->errors_size())) + return; + + for (auto it = response->errors().begin(); it != response->errors().end(); ++it) { + QString path = QString::fromStdString(it->path()); + QString error = QString::fromStdString(it->error().reason()); + emit setResponse(path, error); + } + +} + +void QtKuksaClient::handleSubscribeResponse(const SubscribeResponse *response) +{ + if (!(response && response->updates_size())) + return; + + for (auto it = response->updates().begin(); it != response->updates().end(); ++it) { + // We expect entries that have paths in the response + if (!(it->has_entry() && it->entry().path().size())) + continue; + + auto entry = it->entry(); + QString path = QString::fromStdString(entry.path()); + + Datapoint dp; + if (entry.has_actuator_target()) + dp = entry.actuator_target(); + else + dp = entry.value(); + + QString value; + QString timestamp; + if (convertDatapoint(dp, value, timestamp)) { + if (m_config.verbose() > 1) + qDebug() << "QtKuksaClient::handleSubscribeResponse: path = " << path << ", value = " << value; + + emit subscribeResponse(path, value, timestamp); + } + } +} + +void QtKuksaClient::handleSubscribeDone(const QMap<QString, bool> &signals_, const Status &status) +{ + qInfo() << "QtKuksaClient::handleSubscribeDone: enter"; + if (m_config.verbose() > 1) + qDebug() << "Subscribe status = " << status.error_code() << + " (" << QString::fromStdString(status.error_message()) << ")"; + + emit subscribeDone(signals_, status.error_code() == grpc::CANCELLED); +} + +bool QtKuksaClient::convertDatapoint(const Datapoint &dp, QString &value, QString ×tamp) +{ + // NOTE: Currently no array type support + + if (dp.has_string()) + value = QString::fromStdString(dp.string()); + else if (dp.has_int32()) + value = QString::number(dp.int32()); + else if (dp.has_int64()) + value = QString::number(dp.int64()); + else if (dp.has_uint32()) + value = QString::number(dp.uint32()); + else if (dp.has_uint64()) + value = QString::number(dp.uint64()); + else if (dp.has_float_()) + value = QString::number(dp.float_(), 'f', 6); + else if (dp.has_double_()) + value = QString::number(dp.double_(), 'f', 6); + else if (dp.has_bool_()) + value = dp.bool_() ? "true" : "false"; + else { + if (m_config.verbose()) + qWarning() << "Malformed response (unsupported value type)"; + return false; + } + + timestamp = QString::number(dp.timestamp().seconds()) + "." + QString::number(dp.timestamp().nanos()); + return true; +} + +void QtKuksaClient::handleCriticalFailure(const QString &error) +{ + if (error.size()) + qCritical() << error; +} |