summaryrefslogtreecommitdiffstats
path: root/src/AwsClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/AwsClient.cpp')
-rw-r--r--src/AwsClient.cpp410
1 files changed, 410 insertions, 0 deletions
diff --git a/src/AwsClient.cpp b/src/AwsClient.cpp
new file mode 100644
index 0000000..856eef0
--- /dev/null
+++ b/src/AwsClient.cpp
@@ -0,0 +1,410 @@
+#include "AwsClient.h"
+
+#include <filesystem>
+
+#include "aws_iot_config.h"
+#include "aws_iot_mqtt_client_interface.h"
+
+#include <nlohmann/json.hpp>
+
+#include "CloudType.h"
+#include "ClientManager.h"
+
+#include <glib.h>
+#include <afb/afb-binding.h> // for AFB_* logger
+
+namespace
+{
+
+void aws_subscribe_callback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
+ IoT_Publish_Message_Params *params, void *pData)
+{
+ (void)pClient;
+ AFB_INFO("AWS subscribe callback: %.*s\t%.*s", topicNameLen, topicName, (int) params->payloadLen, (char *) params->payload);
+
+ if (!pData)
+ {
+ AFB_INFO("AwsClient is null");
+ return;
+ }
+
+ AwsClient* aws = (AwsClient*)pData;
+
+ std::string topic;
+ if (topicName && topicNameLen > 0)
+ topic.assign(topicName, topicNameLen);
+
+ if (topic != aws->conf().mqtt_in_topic)
+ {
+ AFB_WARNING("Received message topic is not equal to configured: '%s'", topic.c_str());
+ return;
+ }
+
+ if (!params || !params->payload || params->payloadLen <= 0)
+ {
+ AFB_WARNING("Received message doesn't contain payload");
+ return;
+ }
+
+ const auto& pmsg = AwsClient::parseMessage(std::string((char*)params->payload, params->payloadLen));
+ const auto& appid = pmsg.first;
+ const auto& data = pmsg.second;
+ if (appid.empty())
+ {
+ AFB_ERROR("Can't parse received message");
+ return;
+ }
+
+ ClientManager::instance().emitReceivedMessage(appid, CloudType::Aws, data);
+}
+
+void aws_disconnect_callback(AWS_IoT_Client *pClient, void *data)
+{
+ (void)data;
+ AFB_WARNING("AWS MQTT disconnect");
+ IoT_Error_t rc = IoT_Error_t::FAILURE;
+
+ if(pClient)
+ return;
+
+ if(aws_iot_is_autoreconnect_enabled(pClient))
+ {
+ AFB_INFO("Auto Reconnect is enabled. Reconnecting attempt will start now");
+ }
+ else
+ {
+ AFB_WARNING("Auto Reconnect not enabled. Starting manual reconnect...");
+ rc = aws_iot_mqtt_attempt_reconnect(pClient);
+
+ if(IoT_Error_t::NETWORK_RECONNECTED == rc)
+ AFB_WARNING("Manual Reconnect Successful");
+ else
+ AFB_WARNING("Manual Reconnect Failed - %d", rc);
+ }
+}
+
+} // end namespace
+
+AwsClient::AwsClient() = default;
+
+AwsClient::~AwsClient()
+{
+ stop();
+}
+
+// static
+std::pair<std::string, std::string> AwsClient::parseMessage(const std::string& msg)
+{
+ std::string appid;
+ std::string data;
+ std::string cloud;
+ try
+ {
+ const nlohmann::json& jmsg = nlohmann::json::parse(msg);
+ appid = jmsg.at("application_id").get<std::string>();
+ data = jmsg.at("data").get<std::string>();
+ }
+ catch (const std::exception& ex)
+ {
+ AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what());
+ appid.clear(); // indicate error
+ }
+
+ return {appid, data};
+}
+
+// static
+std::string AwsClient::getApplicationId(const std::string& msg)
+{
+ std::string appid;
+ try
+ {
+ const nlohmann::json& jmsg = nlohmann::json::parse(msg);
+ appid = jmsg.at("application_id").get<std::string>();
+ }
+ catch (const std::exception& ex)
+ {
+ AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what());
+ }
+
+ return appid;
+}
+
+void AwsClient::main_loop()
+{
+ IoT_Publish_Message_Params params;
+ params.qos = QoS::QOS1;
+ params.isRetained = 0;
+
+ while(!m_thread_stop)
+ {
+ IoT_Error_t rc = aws_iot_mqtt_yield(m_aws_client.get(), 100);
+ if(IoT_Error_t::NETWORK_ATTEMPTING_RECONNECT == rc)
+ {
+ // If the client is attempting to reconnect we will skip the rest of the loop.
+ continue;
+ }
+
+ std::string data;
+ {
+ std::lock_guard<std::mutex> lock(m_mutex);
+ if (m_queue.empty())
+ continue;
+
+ data = std::move(m_queue.front());
+ m_queue.pop_front();
+ }
+ params.payload = const_cast<char*>(data.c_str());
+ params.payloadLen = data.length();
+
+
+ std::string appid{getApplicationId(data)};
+ if (appid.empty())
+ {
+ AFB_ERROR("Can't obtain application_id from scheduled message (msg will be dropped)");
+ // ClientManager::instance().emitSendMessageConfirmation(appid, false);
+ continue;
+ }
+
+ rc = IoT_Error_t::FAILURE;
+ if (!appid.empty())
+ {
+ rc = aws_iot_mqtt_publish(m_aws_client.get(), m_conf.mqtt_out_topic.c_str(), (uint16_t)m_conf.mqtt_out_topic.length(), &params);
+ if (rc == IoT_Error_t::MQTT_REQUEST_TIMEOUT_ERROR)
+ {
+ AFB_WARNING("Publish ack not received");
+ //???? rc = IoT_Error_t::SUCCESS;
+ }
+ }
+
+ ClientManager::instance().emitSendMessageConfirmation(appid, CloudType::Aws, rc == IoT_Error_t::SUCCESS);
+ }
+}
+
+void AwsClient::start()
+{
+ if (m_started)
+ return;
+
+ if (!m_aws_client || !m_connected)
+ {
+ AFB_ERROR("Can't start AWS thread: connection is not created");
+ return;
+ }
+
+ m_thread_stop = false;
+ m_thread = std::thread(&AwsClient::main_loop, this);
+ m_started = true;
+}
+
+void AwsClient::stop()
+{
+ if (m_thread.joinable())
+ {
+ AFB_DEBUG("Wait AWS thread...");
+ m_thread_stop = true;
+ m_thread.join();
+ AFB_DEBUG("AWS thread joined");
+
+ if (m_connected)
+ {
+ AFB_DEBUG("AWS disconnecting...");
+ aws_iot_mqtt_unsubscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length());
+ aws_iot_mqtt_disconnect(m_aws_client.get());
+ aws_iot_mqtt_yield(m_aws_client.get(), 100);
+ AFB_DEBUG("AWS disconnected");
+ }
+ }
+
+ m_started = false;
+}
+
+bool AwsClient::createConnection()
+{
+ AFB_NOTICE("%s called", __FUNCTION__);
+
+ if (m_aws_client)
+ {
+ AFB_ERROR("AWS connection already created");
+ return false;
+ }
+
+ m_aws_client.reset(new AWS_IoT_Client);
+
+ IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault;
+ IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault;
+
+ AFB_DEBUG("rootCA cert %s", m_conf.root_ca_cert_path.c_str());
+ AFB_DEBUG("client cert %s", m_conf.device_cert_path.c_str());
+ AFB_DEBUG("client key %s", m_conf.device_priv_key_path.c_str());
+
+ mqttInitParams.enableAutoReconnect = false; // We enable this later below
+ mqttInitParams.pHostURL = const_cast<char*>(m_conf.mqtt_host.c_str());
+ mqttInitParams.port = m_conf.mqtt_port;
+ mqttInitParams.pRootCALocation = const_cast<char*>(m_conf.root_ca_cert_path.c_str());
+ mqttInitParams.pDeviceCertLocation = const_cast<char*>(m_conf.device_cert_path.c_str());
+ mqttInitParams.pDevicePrivateKeyLocation = const_cast<char*>(m_conf.device_priv_key_path.c_str());
+ mqttInitParams.mqttCommandTimeout_ms = 20000;
+ mqttInitParams.tlsHandshakeTimeout_ms = 5000;
+ mqttInitParams.isSSLHostnameVerify = true;
+ mqttInitParams.disconnectHandler = aws_disconnect_callback;
+ mqttInitParams.disconnectHandlerData = nullptr;
+
+ IoT_Error_t rc = aws_iot_mqtt_init(m_aws_client.get(), &mqttInitParams);
+ if(IoT_Error_t::SUCCESS != rc)
+ {
+ AFB_ERROR("aws_iot_mqtt_init returned error: %d ", rc);
+ return false;
+ }
+
+ connectParams.keepAliveIntervalInSec = 600;
+ connectParams.isCleanSession = true;
+ connectParams.MQTTVersion = MQTT_Ver_t::MQTT_3_1_1;
+ connectParams.pClientID = const_cast<char*>(m_conf.mqtt_client_id.c_str());
+ connectParams.clientIDLen = (uint16_t)m_conf.mqtt_client_id.size();
+ connectParams.isWillMsgPresent = false;
+
+ AFB_NOTICE("Connecting...");
+ rc = aws_iot_mqtt_connect(m_aws_client.get(), &connectParams);
+ if(IoT_Error_t::SUCCESS != rc)
+ {
+ AFB_NOTICE("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port);
+ return false;
+ }
+
+ AFB_NOTICE("Subscribing...");
+ rc = aws_iot_mqtt_subscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length(), QOS1, aws_subscribe_callback, this);
+ if(IoT_Error_t::SUCCESS != rc)
+ {
+ AFB_ERROR("Error subscribing: %d ", rc);
+ return false;
+ }
+
+ /*
+ Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h
+ #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL
+ #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL
+ */
+ rc = aws_iot_mqtt_autoreconnect_set_status(m_aws_client.get(), true);
+ if(IoT_Error_t::SUCCESS != rc)
+ {
+ AFB_ERROR("Unable to set AutoReconnect to true: %d", rc);
+ return false;
+ }
+
+ m_connected = true;
+
+ if (!m_started)
+ start();
+
+ return true;
+}
+
+bool AwsClient::sendMessage(const std::string& appid, const std::string& data)
+{
+ if (!m_aws_client || !m_connected)
+ {
+ AFB_ERROR("AwsClient is not ready for message sending");
+ return false;
+ }
+
+ nlohmann::json jmsg{
+ {"application_id", appid},
+ {"data", data}
+ };
+
+ const std::string& msg = jmsg.dump();
+
+ {
+ std::lock_guard<std::mutex> lock(m_mutex);
+
+ if (m_queue.size() < MAX_QUEUE_SIZE)
+ {
+ m_queue.push_back(msg);
+ AFB_DEBUG("AWS queue size: %lu", m_queue.size());
+ return true;
+ }
+ else
+ {
+ AFB_ERROR("Can't send message: AWS queue size exceeded: %lu", m_queue.size());
+ }
+ }
+
+ return false;
+}
+
+bool AwsClient::enabled() const
+{
+ return m_conf.enabled;
+}
+
+bool AwsClient::connected() const
+{
+ return m_connected;
+}
+
+const AwsClientConfig& AwsClient::conf() const
+{
+ return m_conf;
+}
+
+bool AwsClient::loadConf(GKeyFile* conf_file)
+{
+ g_autoptr(GError) error = nullptr;
+
+ auto read_string = [&conf_file](const char* section, const char* key, std::string& set_to)
+ {
+ g_autoptr(GError) error = nullptr;
+ g_autofree gchar *value = g_key_file_get_string(conf_file, section, key, &error);
+ if (value == nullptr)
+ {
+ AFB_ERROR("Can't read %s/%s from config", section, key);
+ return false;
+ }
+ if (!value[0])
+ {
+ AFB_ERROR("Value %s/%s is empty", section, key);
+ return false;
+ }
+ set_to = value;
+ return true;
+ };
+
+ auto check_file = [](const std::string& path)
+ {
+ if (!std::filesystem::exists(path))
+ {
+ AFB_ERROR("File %s doesn't exists", path.c_str());
+ return false;
+ }
+ return true;
+ };
+
+ //-- AWS parameters:
+ m_conf.enabled = g_key_file_get_boolean(conf_file, "AwsCloudConnection", "Enabled", &error);
+
+ if (!m_conf.enabled)
+ return true; // don't check other settings in this case
+
+ if (!read_string("AwsCloudConnection", "MqttHost", m_conf.mqtt_host) ||
+ !read_string("AwsCloudConnection", "MqttClientId", m_conf.mqtt_client_id) ||
+ !read_string("AwsCloudConnection", "MqttOutTopic", m_conf.mqtt_out_topic) ||
+ !read_string("AwsCloudConnection", "MqttInTopic", m_conf.mqtt_in_topic) ||
+ !read_string("AwsCloudConnection", "ThingName", m_conf.thing_name) ||
+ !read_string("AwsCloudConnection", "RootCaCert", m_conf.root_ca_cert_path) ||
+ !read_string("AwsCloudConnection", "DeviceCert", m_conf.device_cert_path) ||
+ !read_string("AwsCloudConnection", "DevicePrivKey", m_conf.device_priv_key_path))
+ return false;
+
+ m_conf.mqtt_port = (uint16_t)g_key_file_get_uint64(conf_file, "AwsCloudConnection", "MqttPort", &error);
+ if (!m_conf.mqtt_port)
+ {
+ AFB_ERROR("Value AwsCloudConnection/MqttPort is not configured");
+ return false;
+ }
+
+ if (!check_file(m_conf.root_ca_cert_path) || !check_file(m_conf.device_cert_path) || !check_file(m_conf.device_priv_key_path))
+ return false;
+
+ return true;
+}