diff options
Diffstat (limited to 'src/AwsClient.cpp')
-rw-r--r-- | src/AwsClient.cpp | 410 |
1 files changed, 410 insertions, 0 deletions
diff --git a/src/AwsClient.cpp b/src/AwsClient.cpp new file mode 100644 index 0000000..856eef0 --- /dev/null +++ b/src/AwsClient.cpp @@ -0,0 +1,410 @@ +#include "AwsClient.h" + +#include <filesystem> + +#include "aws_iot_config.h" +#include "aws_iot_mqtt_client_interface.h" + +#include <nlohmann/json.hpp> + +#include "CloudType.h" +#include "ClientManager.h" + +#include <glib.h> +#include <afb/afb-binding.h> // for AFB_* logger + +namespace +{ + +void aws_subscribe_callback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen, + IoT_Publish_Message_Params *params, void *pData) +{ + (void)pClient; + AFB_INFO("AWS subscribe callback: %.*s\t%.*s", topicNameLen, topicName, (int) params->payloadLen, (char *) params->payload); + + if (!pData) + { + AFB_INFO("AwsClient is null"); + return; + } + + AwsClient* aws = (AwsClient*)pData; + + std::string topic; + if (topicName && topicNameLen > 0) + topic.assign(topicName, topicNameLen); + + if (topic != aws->conf().mqtt_in_topic) + { + AFB_WARNING("Received message topic is not equal to configured: '%s'", topic.c_str()); + return; + } + + if (!params || !params->payload || params->payloadLen <= 0) + { + AFB_WARNING("Received message doesn't contain payload"); + return; + } + + const auto& pmsg = AwsClient::parseMessage(std::string((char*)params->payload, params->payloadLen)); + const auto& appid = pmsg.first; + const auto& data = pmsg.second; + if (appid.empty()) + { + AFB_ERROR("Can't parse received message"); + return; + } + + ClientManager::instance().emitReceivedMessage(appid, CloudType::Aws, data); +} + +void aws_disconnect_callback(AWS_IoT_Client *pClient, void *data) +{ + (void)data; + AFB_WARNING("AWS MQTT disconnect"); + IoT_Error_t rc = IoT_Error_t::FAILURE; + + if(pClient) + return; + + if(aws_iot_is_autoreconnect_enabled(pClient)) + { + AFB_INFO("Auto Reconnect is enabled. Reconnecting attempt will start now"); + } + else + { + AFB_WARNING("Auto Reconnect not enabled. Starting manual reconnect..."); + rc = aws_iot_mqtt_attempt_reconnect(pClient); + + if(IoT_Error_t::NETWORK_RECONNECTED == rc) + AFB_WARNING("Manual Reconnect Successful"); + else + AFB_WARNING("Manual Reconnect Failed - %d", rc); + } +} + +} // end namespace + +AwsClient::AwsClient() = default; + +AwsClient::~AwsClient() +{ + stop(); +} + +// static +std::pair<std::string, std::string> AwsClient::parseMessage(const std::string& msg) +{ + std::string appid; + std::string data; + std::string cloud; + try + { + const nlohmann::json& jmsg = nlohmann::json::parse(msg); + appid = jmsg.at("application_id").get<std::string>(); + data = jmsg.at("data").get<std::string>(); + } + catch (const std::exception& ex) + { + AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what()); + appid.clear(); // indicate error + } + + return {appid, data}; +} + +// static +std::string AwsClient::getApplicationId(const std::string& msg) +{ + std::string appid; + try + { + const nlohmann::json& jmsg = nlohmann::json::parse(msg); + appid = jmsg.at("application_id").get<std::string>(); + } + catch (const std::exception& ex) + { + AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what()); + } + + return appid; +} + +void AwsClient::main_loop() +{ + IoT_Publish_Message_Params params; + params.qos = QoS::QOS1; + params.isRetained = 0; + + while(!m_thread_stop) + { + IoT_Error_t rc = aws_iot_mqtt_yield(m_aws_client.get(), 100); + if(IoT_Error_t::NETWORK_ATTEMPTING_RECONNECT == rc) + { + // If the client is attempting to reconnect we will skip the rest of the loop. + continue; + } + + std::string data; + { + std::lock_guard<std::mutex> lock(m_mutex); + if (m_queue.empty()) + continue; + + data = std::move(m_queue.front()); + m_queue.pop_front(); + } + params.payload = const_cast<char*>(data.c_str()); + params.payloadLen = data.length(); + + + std::string appid{getApplicationId(data)}; + if (appid.empty()) + { + AFB_ERROR("Can't obtain application_id from scheduled message (msg will be dropped)"); + // ClientManager::instance().emitSendMessageConfirmation(appid, false); + continue; + } + + rc = IoT_Error_t::FAILURE; + if (!appid.empty()) + { + rc = aws_iot_mqtt_publish(m_aws_client.get(), m_conf.mqtt_out_topic.c_str(), (uint16_t)m_conf.mqtt_out_topic.length(), ¶ms); + if (rc == IoT_Error_t::MQTT_REQUEST_TIMEOUT_ERROR) + { + AFB_WARNING("Publish ack not received"); + //???? rc = IoT_Error_t::SUCCESS; + } + } + + ClientManager::instance().emitSendMessageConfirmation(appid, CloudType::Aws, rc == IoT_Error_t::SUCCESS); + } +} + +void AwsClient::start() +{ + if (m_started) + return; + + if (!m_aws_client || !m_connected) + { + AFB_ERROR("Can't start AWS thread: connection is not created"); + return; + } + + m_thread_stop = false; + m_thread = std::thread(&AwsClient::main_loop, this); + m_started = true; +} + +void AwsClient::stop() +{ + if (m_thread.joinable()) + { + AFB_DEBUG("Wait AWS thread..."); + m_thread_stop = true; + m_thread.join(); + AFB_DEBUG("AWS thread joined"); + + if (m_connected) + { + AFB_DEBUG("AWS disconnecting..."); + aws_iot_mqtt_unsubscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length()); + aws_iot_mqtt_disconnect(m_aws_client.get()); + aws_iot_mqtt_yield(m_aws_client.get(), 100); + AFB_DEBUG("AWS disconnected"); + } + } + + m_started = false; +} + +bool AwsClient::createConnection() +{ + AFB_NOTICE("%s called", __FUNCTION__); + + if (m_aws_client) + { + AFB_ERROR("AWS connection already created"); + return false; + } + + m_aws_client.reset(new AWS_IoT_Client); + + IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault; + IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault; + + AFB_DEBUG("rootCA cert %s", m_conf.root_ca_cert_path.c_str()); + AFB_DEBUG("client cert %s", m_conf.device_cert_path.c_str()); + AFB_DEBUG("client key %s", m_conf.device_priv_key_path.c_str()); + + mqttInitParams.enableAutoReconnect = false; // We enable this later below + mqttInitParams.pHostURL = const_cast<char*>(m_conf.mqtt_host.c_str()); + mqttInitParams.port = m_conf.mqtt_port; + mqttInitParams.pRootCALocation = const_cast<char*>(m_conf.root_ca_cert_path.c_str()); + mqttInitParams.pDeviceCertLocation = const_cast<char*>(m_conf.device_cert_path.c_str()); + mqttInitParams.pDevicePrivateKeyLocation = const_cast<char*>(m_conf.device_priv_key_path.c_str()); + mqttInitParams.mqttCommandTimeout_ms = 20000; + mqttInitParams.tlsHandshakeTimeout_ms = 5000; + mqttInitParams.isSSLHostnameVerify = true; + mqttInitParams.disconnectHandler = aws_disconnect_callback; + mqttInitParams.disconnectHandlerData = nullptr; + + IoT_Error_t rc = aws_iot_mqtt_init(m_aws_client.get(), &mqttInitParams); + if(IoT_Error_t::SUCCESS != rc) + { + AFB_ERROR("aws_iot_mqtt_init returned error: %d ", rc); + return false; + } + + connectParams.keepAliveIntervalInSec = 600; + connectParams.isCleanSession = true; + connectParams.MQTTVersion = MQTT_Ver_t::MQTT_3_1_1; + connectParams.pClientID = const_cast<char*>(m_conf.mqtt_client_id.c_str()); + connectParams.clientIDLen = (uint16_t)m_conf.mqtt_client_id.size(); + connectParams.isWillMsgPresent = false; + + AFB_NOTICE("Connecting..."); + rc = aws_iot_mqtt_connect(m_aws_client.get(), &connectParams); + if(IoT_Error_t::SUCCESS != rc) + { + AFB_NOTICE("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port); + return false; + } + + AFB_NOTICE("Subscribing..."); + rc = aws_iot_mqtt_subscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length(), QOS1, aws_subscribe_callback, this); + if(IoT_Error_t::SUCCESS != rc) + { + AFB_ERROR("Error subscribing: %d ", rc); + return false; + } + + /* + Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h + #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL + #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL + */ + rc = aws_iot_mqtt_autoreconnect_set_status(m_aws_client.get(), true); + if(IoT_Error_t::SUCCESS != rc) + { + AFB_ERROR("Unable to set AutoReconnect to true: %d", rc); + return false; + } + + m_connected = true; + + if (!m_started) + start(); + + return true; +} + +bool AwsClient::sendMessage(const std::string& appid, const std::string& data) +{ + if (!m_aws_client || !m_connected) + { + AFB_ERROR("AwsClient is not ready for message sending"); + return false; + } + + nlohmann::json jmsg{ + {"application_id", appid}, + {"data", data} + }; + + const std::string& msg = jmsg.dump(); + + { + std::lock_guard<std::mutex> lock(m_mutex); + + if (m_queue.size() < MAX_QUEUE_SIZE) + { + m_queue.push_back(msg); + AFB_DEBUG("AWS queue size: %lu", m_queue.size()); + return true; + } + else + { + AFB_ERROR("Can't send message: AWS queue size exceeded: %lu", m_queue.size()); + } + } + + return false; +} + +bool AwsClient::enabled() const +{ + return m_conf.enabled; +} + +bool AwsClient::connected() const +{ + return m_connected; +} + +const AwsClientConfig& AwsClient::conf() const +{ + return m_conf; +} + +bool AwsClient::loadConf(GKeyFile* conf_file) +{ + g_autoptr(GError) error = nullptr; + + auto read_string = [&conf_file](const char* section, const char* key, std::string& set_to) + { + g_autoptr(GError) error = nullptr; + g_autofree gchar *value = g_key_file_get_string(conf_file, section, key, &error); + if (value == nullptr) + { + AFB_ERROR("Can't read %s/%s from config", section, key); + return false; + } + if (!value[0]) + { + AFB_ERROR("Value %s/%s is empty", section, key); + return false; + } + set_to = value; + return true; + }; + + auto check_file = [](const std::string& path) + { + if (!std::filesystem::exists(path)) + { + AFB_ERROR("File %s doesn't exists", path.c_str()); + return false; + } + return true; + }; + + //-- AWS parameters: + m_conf.enabled = g_key_file_get_boolean(conf_file, "AwsCloudConnection", "Enabled", &error); + + if (!m_conf.enabled) + return true; // don't check other settings in this case + + if (!read_string("AwsCloudConnection", "MqttHost", m_conf.mqtt_host) || + !read_string("AwsCloudConnection", "MqttClientId", m_conf.mqtt_client_id) || + !read_string("AwsCloudConnection", "MqttOutTopic", m_conf.mqtt_out_topic) || + !read_string("AwsCloudConnection", "MqttInTopic", m_conf.mqtt_in_topic) || + !read_string("AwsCloudConnection", "ThingName", m_conf.thing_name) || + !read_string("AwsCloudConnection", "RootCaCert", m_conf.root_ca_cert_path) || + !read_string("AwsCloudConnection", "DeviceCert", m_conf.device_cert_path) || + !read_string("AwsCloudConnection", "DevicePrivKey", m_conf.device_priv_key_path)) + return false; + + m_conf.mqtt_port = (uint16_t)g_key_file_get_uint64(conf_file, "AwsCloudConnection", "MqttPort", &error); + if (!m_conf.mqtt_port) + { + AFB_ERROR("Value AwsCloudConnection/MqttPort is not configured"); + return false; + } + + if (!check_file(m_conf.root_ca_cert_path) || !check_file(m_conf.device_cert_path) || !check_file(m_conf.device_priv_key_path)) + return false; + + return true; +} |