#include "AwsClient.h"

#include <filesystem>

#include "aws_iot_config.h"
#include "aws_iot_mqtt_client_interface.h"

#include <nlohmann/json.hpp>

#include "CloudType.h"
#include "ClientManager.h"

#include <glib.h>
#include <afb/afb-binding.h> // for AFB_* logger

namespace
{

void aws_subscribe_callback(AWS_IoT_Client *pClient, char *topicName, uint16_t topicNameLen,
                            IoT_Publish_Message_Params *params, void *pData)
{
    (void)pClient;
    AFB_INFO("AWS subscribe callback: %.*s\t%.*s", topicNameLen, topicName, (int) params->payloadLen, (char *) params->payload);

    if (!pData)
    {
        AFB_INFO("AwsClient is null");
        return;
    }

    AwsClient* aws = (AwsClient*)pData;

    std::string topic;
    if (topicName && topicNameLen > 0)
        topic.assign(topicName, topicNameLen);

    if (topic != aws->conf().mqtt_in_topic)
    {
        AFB_WARNING("Received message topic is not equal to configured: '%s'", topic.c_str());
        return;
    }

    if (!params || !params->payload || params->payloadLen <= 0)
    {
        AFB_WARNING("Received message doesn't contain payload");
        return;
    }

    const auto& pmsg = AwsClient::parseMessage(std::string((char*)params->payload, params->payloadLen));
    const auto& appid = pmsg.first;
    const auto& data = pmsg.second;
    if (appid.empty())
    {
        AFB_ERROR("Can't parse received message");
        return;
    }

    ClientManager::instance().emitReceivedMessage(appid, CloudType::Aws, data);
}

void aws_disconnect_callback(AWS_IoT_Client *pClient, void *data)
{
    (void)data;
    AFB_WARNING("AWS MQTT disconnect");
    IoT_Error_t rc = IoT_Error_t::FAILURE;

    if(pClient)
        return;

    if(aws_iot_is_autoreconnect_enabled(pClient))
    {
        AFB_INFO("Auto Reconnect is enabled. Reconnecting attempt will start now");
    }
    else
    {
        AFB_WARNING("Auto Reconnect not enabled. Starting manual reconnect...");
        rc = aws_iot_mqtt_attempt_reconnect(pClient);

        if(IoT_Error_t::NETWORK_RECONNECTED == rc)
            AFB_WARNING("Manual Reconnect Successful");
        else
            AFB_WARNING("Manual Reconnect Failed - %d", rc);
    }
}

} // end namespace

AwsClient::AwsClient() = default;

AwsClient::~AwsClient()
{
    stop();
}

// static
std::pair<std::string, std::string> AwsClient::parseMessage(const std::string& msg)
{
    std::string appid;
    std::string data;
    std::string cloud;
    try
    {
        const nlohmann::json& jmsg = nlohmann::json::parse(msg);
        appid = jmsg.at("application_id").get<std::string>();
        data = jmsg.at("data").get<std::string>();
    }
    catch (const std::exception& ex)
    {
        AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what());
        appid.clear(); // indicate error
    }

    return {appid, data};
}

// static
std::string AwsClient::getApplicationId(const std::string& msg)
{
    std::string appid;
    try
    {
        const nlohmann::json& jmsg = nlohmann::json::parse(msg);
        appid = jmsg.at("application_id").get<std::string>();
    }
    catch (const std::exception& ex)
    {
        AFB_ERROR("Can't parse message (%s): %s", msg.c_str(), ex.what());
    }

    return appid;
}

void AwsClient::main_loop()
{
    IoT_Publish_Message_Params params;
    params.qos = QoS::QOS1;
    params.isRetained = 0;

    while(!m_thread_stop)
    {
        IoT_Error_t rc = aws_iot_mqtt_yield(m_aws_client.get(), 100);
        if(IoT_Error_t::NETWORK_ATTEMPTING_RECONNECT == rc)
        {
            // If the client is attempting to reconnect we will skip the rest of the loop.
            continue;
        }

        std::string data;
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if (m_queue.empty())
                continue;

            data = std::move(m_queue.front());
            m_queue.pop_front();
        }
        params.payload = const_cast<char*>(data.c_str());
        params.payloadLen = data.length();


        std::string appid{getApplicationId(data)};
        if (appid.empty())
        {
            AFB_ERROR("Can't obtain application_id from scheduled message (msg will be dropped)");
            // ClientManager::instance().emitSendMessageConfirmation(appid, false);
            continue;
        }

        rc = IoT_Error_t::FAILURE;
        if (!appid.empty())
        {
            rc = aws_iot_mqtt_publish(m_aws_client.get(), m_conf.mqtt_out_topic.c_str(), (uint16_t)m_conf.mqtt_out_topic.length(), &params);
            if (rc == IoT_Error_t::MQTT_REQUEST_TIMEOUT_ERROR)
            {
                AFB_WARNING("Publish ack not received");
                //???? rc = IoT_Error_t::SUCCESS;
            }
        }

        ClientManager::instance().emitSendMessageConfirmation(appid, CloudType::Aws, rc == IoT_Error_t::SUCCESS);
    }
}

void AwsClient::start()
{
    if (m_started)
        return;

    if (!m_aws_client || !m_connected)
    {
        AFB_ERROR("Can't start AWS thread: connection is not created");
        return;
    }

    m_thread_stop = false;
    m_thread = std::thread(&AwsClient::main_loop, this);
    m_started = true;
}

void AwsClient::stop()
{
    if (m_thread.joinable())
    {
        AFB_DEBUG("Wait AWS thread...");
        m_thread_stop = true;
        m_thread.join();
        AFB_DEBUG("AWS thread joined");

        if (m_connected)
        {
            AFB_DEBUG("AWS disconnecting...");
            aws_iot_mqtt_unsubscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length());
            aws_iot_mqtt_disconnect(m_aws_client.get());
            aws_iot_mqtt_yield(m_aws_client.get(), 100);
            AFB_DEBUG("AWS disconnected");
        }
    }

    m_started = false;
}

bool AwsClient::createConnection()
{
    AFB_NOTICE("%s called", __FUNCTION__);

    if (m_aws_client)
    {
        AFB_ERROR("AWS connection already created");
        return false;
    }

    m_aws_client.reset(new AWS_IoT_Client);

    IoT_Client_Init_Params mqttInitParams = iotClientInitParamsDefault;
    IoT_Client_Connect_Params connectParams = iotClientConnectParamsDefault;

    AFB_DEBUG("rootCA cert %s", m_conf.root_ca_cert_path.c_str());
    AFB_DEBUG("client cert %s", m_conf.device_cert_path.c_str());
    AFB_DEBUG("client key %s", m_conf.device_priv_key_path.c_str());

    mqttInitParams.enableAutoReconnect = false; // We enable this later below
    mqttInitParams.pHostURL = const_cast<char*>(m_conf.mqtt_host.c_str());
    mqttInitParams.port = m_conf.mqtt_port;
    mqttInitParams.pRootCALocation = const_cast<char*>(m_conf.root_ca_cert_path.c_str());
    mqttInitParams.pDeviceCertLocation = const_cast<char*>(m_conf.device_cert_path.c_str());
    mqttInitParams.pDevicePrivateKeyLocation = const_cast<char*>(m_conf.device_priv_key_path.c_str());
    mqttInitParams.mqttCommandTimeout_ms = 20000;
    mqttInitParams.tlsHandshakeTimeout_ms = 5000;
    mqttInitParams.isSSLHostnameVerify = true;
    mqttInitParams.disconnectHandler = aws_disconnect_callback;
    mqttInitParams.disconnectHandlerData = nullptr;

    IoT_Error_t rc = aws_iot_mqtt_init(m_aws_client.get(), &mqttInitParams);
    if(IoT_Error_t::SUCCESS != rc)
    {
        AFB_ERROR("aws_iot_mqtt_init returned error: %d ", rc);
        return false;
    }

    connectParams.keepAliveIntervalInSec = 600;
    connectParams.isCleanSession = true;
    connectParams.MQTTVersion = MQTT_Ver_t::MQTT_3_1_1;
    connectParams.pClientID = const_cast<char*>(m_conf.mqtt_client_id.c_str());
    connectParams.clientIDLen = (uint16_t)m_conf.mqtt_client_id.size();
    connectParams.isWillMsgPresent = false;

    AFB_NOTICE("Connecting...");
    rc = aws_iot_mqtt_connect(m_aws_client.get(), &connectParams);
    if(IoT_Error_t::SUCCESS != rc)
    {
        AFB_NOTICE("Error(%d) connecting to %s:%d", rc, mqttInitParams.pHostURL, mqttInitParams.port);
        return false;
    }

    AFB_NOTICE("Subscribing...");
    rc = aws_iot_mqtt_subscribe(m_aws_client.get(), m_conf.mqtt_in_topic.c_str(), (uint16_t)m_conf.mqtt_in_topic.length(), QOS1, aws_subscribe_callback, this);
    if(IoT_Error_t::SUCCESS != rc)
    {
        AFB_ERROR("Error subscribing: %d ", rc);
        return false;
    }

    /*
      Enable Auto Reconnect functionality. Minimum and Maximum time of Exponential backoff are set in aws_iot_config.h
       #AWS_IOT_MQTT_MIN_RECONNECT_WAIT_INTERVAL
       #AWS_IOT_MQTT_MAX_RECONNECT_WAIT_INTERVAL
    */
    rc = aws_iot_mqtt_autoreconnect_set_status(m_aws_client.get(), true);
    if(IoT_Error_t::SUCCESS != rc)
    {
        AFB_ERROR("Unable to set AutoReconnect to true: %d", rc);
        return false;
    }

    m_connected = true;

    if (!m_started)
        start();

    return true;
}

bool AwsClient::sendMessage(const std::string& appid, const std::string& data)
{
    if (!m_aws_client || !m_connected)
    {
        AFB_ERROR("AwsClient is not ready for message sending");
        return false;
    }

    nlohmann::json jmsg{
        {"application_id", appid},
        {"data", data}
    };

    const std::string& msg = jmsg.dump();

    {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (m_queue.size() < MAX_QUEUE_SIZE)
        {
            m_queue.push_back(msg);
            AFB_DEBUG("AWS queue size: %lu", m_queue.size());
            return true;
        }
        else
        {
            AFB_ERROR("Can't send message: AWS queue size exceeded: %lu", m_queue.size());
        }
    }

    return false;
}

bool AwsClient::enabled() const
{
    return m_conf.enabled;
}

bool AwsClient::connected() const
{
    return m_connected;
}

const AwsClientConfig& AwsClient::conf() const
{
    return m_conf;
}

bool AwsClient::loadConf(GKeyFile* conf_file)
{
    g_autoptr(GError) error = nullptr;

    auto read_string = [&conf_file](const char* section, const char* key, std::string& set_to)
    {
        g_autoptr(GError) error = nullptr;
        g_autofree gchar *value = g_key_file_get_string(conf_file, section, key, &error);
        if (value == nullptr)
        {
            AFB_ERROR("Can't read %s/%s from config", section, key);
            return false;
        }
        if (!value[0])
        {
            AFB_ERROR("Value %s/%s is empty", section, key);
            return false;
        }
        set_to = value;
        return true;
    };

    auto check_file = [](const std::string& path)
    {
        if (!std::filesystem::exists(path))
        {
            AFB_ERROR("File %s doesn't exists", path.c_str());
            return false;
        }
        return true;
    };

    //-- AWS parameters:
    m_conf.enabled = g_key_file_get_boolean(conf_file, "AwsCloudConnection", "Enabled", &error);

    if (!m_conf.enabled)
        return true; // don't check other settings in this case

    if (!read_string("AwsCloudConnection", "MqttHost", m_conf.mqtt_host) ||
            !read_string("AwsCloudConnection", "MqttClientId", m_conf.mqtt_client_id) ||
            !read_string("AwsCloudConnection", "MqttOutTopic", m_conf.mqtt_out_topic) ||
            !read_string("AwsCloudConnection", "MqttInTopic", m_conf.mqtt_in_topic) ||
            !read_string("AwsCloudConnection", "ThingName", m_conf.thing_name) ||
            !read_string("AwsCloudConnection", "RootCaCert", m_conf.root_ca_cert_path) ||
            !read_string("AwsCloudConnection", "DeviceCert", m_conf.device_cert_path) ||
            !read_string("AwsCloudConnection", "DevicePrivKey", m_conf.device_priv_key_path))
    return false;

    m_conf.mqtt_port = (uint16_t)g_key_file_get_uint64(conf_file, "AwsCloudConnection", "MqttPort", &error);
    if (!m_conf.mqtt_port)
    {
        AFB_ERROR("Value AwsCloudConnection/MqttPort is not configured");
        return false;
    }

    if (!check_file(m_conf.root_ca_cert_path) || !check_file(m_conf.device_cert_path) || !check_file(m_conf.device_priv_key_path))
    return false;

    return true;
}