#include "AwsClient.h" #include <filesystem> #include "aws_iot_config.h" #include "aws_iot_mqtt_client_interface.h" #include <nlohmann/json.hpp> #include "CloudType.h" #include "ClientManager.h" #include <glib.h> #include <afb/afb-binding.h> // for AFB_* logger namespace { void aws_subscribe_callback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, IoT_Publish_Message_Params *params, void *pData) { (void)pClient; AFB_INFO("AWS subscribe callback: %.*s\t%.*s", topicNameLen, topicName, (int) params->payloadLen, (char *) params->payload); if (!pData) { AFB_INFO("AwsClient is null"); return; } AwsClient* aws = (AwsClient*)pData; std::string topic; if (topicName && topicNameLen > 0) topic.assign(topicName, topicNameLen); if (topic != aws->conf().mqtt_in_topic) { AFB_WARNING("Received message topic is not equal to configured: '%s'", topic.c_str()); return; } if (!params || !params->payload || params->payloadLen <= 0) { AFB_WARNING("Received message doesn't contain payload"); return; } const auto& pmsg = AwsClient::parseMessage(std::string((char*)params->payload, params->payloadLen)); const auto& appid = pmsg.first; const auto& data = pmsg.second; if (appid.empty()) { AFB_ERROR("Can't parse received message"); return; } ClientManager::instance().emitReceivedMessage(appid, CloudType::Aws, data); } void aws_disconnect_callback(AWS_IoT_Client *pClient, void *data) { (void)data; AFB_WARNING("AWS MQTT disconnect"); IoT_Error_t rc = IoT_Error_t::FAILURE; if(pClient) return; if(aws_iot_is_autoreconnect_enabled(pClient)) { AFB_INFO("Auto Reconnect is enabled. Reconnecting attempt will start now"); } else { AFB_WARNING("Auto Reconnect not enabled. Starting manual reconnect..."); rc = aws_iot_mqtt_attempt_reconnect(pClient); if(IoT_Error_t::NETWORK_RECONNECTED == rc) AFB_WARNING("Manual Reconnect Successful"); else AFB_WARNING("Manual Reconnect Failed - %d", rc); } } } // end namespace AwsClient::AwsClient() = default; AwsClient::~AwsClient() { stop(); } // static std::pair<std::string, std::string> AwsClient::parseMessage(const std::string& msg) { std::string appid; std::string data; std::string cloud; try { const nlohmann::json& jmsg = nlohmann::json::parse(msg); appid = jmsg.at("application_id").get<std::string>(); data = jmsg.at("data").get<std::string>(); } catch (const std::exception& ex) { AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what()); appid.clear(); // indicate error } return {appid, data}; } // static std::string AwsClient::getApplicationId(const std::string& msg) { std::string appid; try { const nlohmann::json& jmsg = nlohmann::json::parse(msg); appid = jmsg.at("application_id").get<std::string>(); } catch (const std::exception& ex) { AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what()); } return appid; } void AwsClient::main_loop() { IoT_Publish_Message_Params params; params.qos = QoS::QOS1; params.isRetained = 0; while(!m_thread_stop) { IoT_Error_t rc = aws_iot_mqtt_yield(m_aws_client.get(), 100); if(IoT_Error_t::NETWORK_ATTEMPTING_RECONNECT == rc) { // If the client is attempting to reconnect we will skip the rest of the loop. continue; } std::string data; { std::lock_guard<std::mutex> lock(m_mutex); if (m_queue.empty()) continue; data = std::move(m_queue.front()); m_queue.pop_front(); } params.payload = const_cast<char*>(data.c_str()); params.payloadLen = data.length(); std::string appid{getApplicationId(data)}; if (appid.empty()) { AFB_ERROR("Can't obtain application_id from scheduled message (msg will be dropped)"); // ClientManager::instance().emitSendMessageConfirmation(appid, false); continue; } rc = IoT_Error_t::FAILURE; if (!appid.empty()) { rc = aws_iot_mqtt_publish(m_aws_client.get(), m_conf.mqtt_out_topic.c_str(), (uint16_t)m_conf.mqtt_out_topic.length(), ¶ms); if (rc == IoT_Error_t::MQTT_REQUEST_TIMEOUT_ERROR) { AFB_WARNING("Publish ack not received"); //???? rc = IoT_Error_t::SUCCESS; } } ClientManager::instance().emitSendMessageConfirmation(appid, CloudType::Aws, rc == IoT_Error_t::SUCCESS); } } void AwsClient::start() { if (m_started) return; if (!m_aws_client || !m_connected) { AFB_ERROR("Can't start AWS thread: connection is not created"); return; } m_thread_stop = false; m_thread = std::thread(&AwsClient::main_loop, this); m_started = true; } void AwsClient::stop() { if (m_thread.joinable()) { AFB_DEBUG("Wait AWS thread..."); m_thread_stop = true; m_thread.join(); AFB_DEBUG("AWS thread joined"); if (m_connected) { AFB_DEBUG("AWS disconnecting..."); aws_iot_mqtt_unsubscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length()); aws_iot_mqtt_disconnect(m_aws_client.get()); aws_iot_mqtt_yield(m_aws_client.get(), 100); AFB_DEBUG("AWS disconnected"); } } m_started = false; } bool AwsClient::createConnection() { AFB_NOTICE("%s called", __FUNCTION__); if (m_aws_client) { AFB_ERROR("AWS connection already created"); return false; } m_aws_client.reset(new AWS_IoT_Client); IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault; IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault; AFB_DEBUG("rootCA cert %s", m_conf.root_ca_cert_path.c_str()); AFB_DEBUG("client cert %s", m_conf.device_cert_path.c_str()); AFB_DEBUG("client key %s", m_conf.device_priv_key_path.c_str()); mqttInitParams.enableAutoReconnect = false; // We enable this later below mqttInitParams.pHostURL = const_cast<char*>(m_conf.mqtt_host.c_str()); mqttInitParams.port = m_conf.mqtt_port; mqttInitParams.pRootCALocation = const_cast<char*>(m_conf.root_ca_cert_path.c_str()); mqttInitParams.pDeviceCertLocation = const_cast<char*>(m_conf.device_cert_path.c_str()); mqttInitParams.pDevicePrivateKeyLocation = const_cast<char*>(m_conf.device_priv_key_path.c_str()); mqttInitParams.mqttCommandTimeout_ms = 20000; mqttInitParams.tlsHandshakeTimeout_ms = 5000; mqttInitParams.isSSLHostnameVerify = true; mqttInitParams.disconnectHandler = aws_disconnect_callback; mqttInitParams.disconnectHandlerData = nullptr; IoT_Error_t rc = aws_iot_mqtt_init(m_aws_client.get(), &mqttInitParams); if(IoT_Error_t::SUCCESS != rc) { AFB_ERROR("aws_iot_mqtt_init returned error: %d ", rc); return false; } connectParams.keepAliveIntervalInSec = 600; connectParams.isCleanSession = true; connectParams.MQTTVersion = MQTT_Ver_t::MQTT_3_1_1; connectParams.pClientID = const_cast<char*>(m_conf.mqtt_client_id.c_str()); connectParams.clientIDLen = (uint16_t)m_conf.mqtt_client_id.size(); connectParams.isWillMsgPresent = false; AFB_NOTICE("Connecting..."); rc = aws_iot_mqtt_connect(m_aws_client.get(), &connectParams); if(IoT_Error_t::SUCCESS != rc) { AFB_NOTICE("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port); return false; } AFB_NOTICE("Subscribing..."); rc = aws_iot_mqtt_subscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length(), QOS1, aws_subscribe_callback, this); if(IoT_Error_t::SUCCESS != rc) { AFB_ERROR("Error subscribing: %d ", rc); return false; } /* Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL */ rc = aws_iot_mqtt_autoreconnect_set_status(m_aws_client.get(), true); if(IoT_Error_t::SUCCESS != rc) { AFB_ERROR("Unable to set AutoReconnect to true: %d", rc); return false; } m_connected = true; if (!m_started) start(); return true; } bool AwsClient::sendMessage(const std::string& appid, const std::string& data) { if (!m_aws_client || !m_connected) { AFB_ERROR("AwsClient is not ready for message sending"); return false; } nlohmann::json jmsg{ {"application_id", appid}, {"data", data} }; const std::string& msg = jmsg.dump(); { std::lock_guard<std::mutex> lock(m_mutex); if (m_queue.size() < MAX_QUEUE_SIZE) { m_queue.push_back(msg); AFB_DEBUG("AWS queue size: %lu", m_queue.size()); return true; } else { AFB_ERROR("Can't send message: AWS queue size exceeded: %lu", m_queue.size()); } } return false; } bool AwsClient::enabled() const { return m_conf.enabled; } bool AwsClient::connected() const { return m_connected; } const AwsClientConfig& AwsClient::conf() const { return m_conf; } bool AwsClient::loadConf(GKeyFile* conf_file) { g_autoptr(GError) error = nullptr; auto read_string = [&conf_file](const char* section, const char* key, std::string& set_to) { g_autoptr(GError) error = nullptr; g_autofree gchar *value = g_key_file_get_string(conf_file, section, key, &error); if (value == nullptr) { AFB_ERROR("Can't read %s/%s from config", section, key); return false; } if (!value[0]) { AFB_ERROR("Value %s/%s is empty", section, key); return false; } set_to = value; return true; }; auto check_file = [](const std::string& path) { if (!std::filesystem::exists(path)) { AFB_ERROR("File %s doesn't exists", path.c_str()); return false; } return true; }; //-- AWS parameters: m_conf.enabled = g_key_file_get_boolean(conf_file, "AwsCloudConnection", "Enabled", &error); if (!m_conf.enabled) return true; // don't check other settings in this case if (!read_string("AwsCloudConnection", "MqttHost", m_conf.mqtt_host) || !read_string("AwsCloudConnection", "MqttClientId", m_conf.mqtt_client_id) || !read_string("AwsCloudConnection", "MqttOutTopic", m_conf.mqtt_out_topic) || !read_string("AwsCloudConnection", "MqttInTopic", m_conf.mqtt_in_topic) || !read_string("AwsCloudConnection", "ThingName", m_conf.thing_name) || !read_string("AwsCloudConnection", "RootCaCert", m_conf.root_ca_cert_path) || !read_string("AwsCloudConnection", "DeviceCert", m_conf.device_cert_path) || !read_string("AwsCloudConnection", "DevicePrivKey", m_conf.device_priv_key_path)) return false; m_conf.mqtt_port = (uint16_t)g_key_file_get_uint64(conf_file, "AwsCloudConnection", "MqttPort", &error); if (!m_conf.mqtt_port) { AFB_ERROR("Value AwsCloudConnection/MqttPort is not configured"); return false; } if (!check_file(m_conf.root_ca_cert_path) || !check_file(m_conf.device_cert_path) || !check_file(m_conf.device_priv_key_path)) return false; return true; }