/*
 * Video On Demand Samples
 *
 * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * You may also obtain this software under a propriety license from Microchip.
 * Please contact Microchip for further information.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include "Console.h"
#include "MostIpc.h"


#define TCP_BUFFER_SIZE (1024*1024)

CMostIpc::CMostIpc( int port, bool server ) : isServer( server ), allowThreadRun( true ),
    udpRxThreadIsRunning( false ), txThreadIsRunning (false), acceptThreadIsRunning( false )
{
    if( isServer && 0 >= port )
    {
        ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Server port must be greater than 0!"RESETCOLOR"\n" );
        return;
    }
    pthread_mutex_init( &critical_mutex_tcp, NULL );
    sem_init( &semTx, 0, 0 ); //Mutex, initialized to 0 => sem_wait will block instantly

    struct sockaddr_in listenAddress;
    /* Receive address configuration */
    memset( &listenAddress, 0, sizeof( listenAddress ) );
    listenAddress.sin_family = AF_INET;
    listenAddress.sin_addr.s_addr = htonl( INADDR_ANY );
    listenAddress.sin_port = htons( port );

    if( 0 > ( udpSock = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP ) ) )
    {
        ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create UDP socket"RESETCOLOR"\n" );
        return;
    }
    if( 0 > ( tcpSock = socket( PF_INET, SOCK_STREAM, 0 ) ) )
    {
        ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create TCP socket"RESETCOLOR"\n" );
        return;
    }
    
    //Avoid hanging socket after killing application, only works with Linux kernel > 3.9
#if defined (SO_REUSEADDR) && defined (SO_REUSEPORT)
	int one = 1;
    setsockopt(udpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
    setsockopt(tcpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
#endif

    // this call is what allows broadcast UDP packets to be sent:
    int enableSocketFlag = 1;
    if( 0 > setsockopt( udpSock, SOL_SOCKET, SO_BROADCAST, &enableSocketFlag, sizeof( enableSocketFlag ) ) )
    {
        ConsolePrintf(
            PRIO_ERROR, RED"CMostIpc: setsockopt (SO_BROADCAST) failed, tried to set to: %d"RESETCOLOR"\n", enableSocketFlag );
    }
    if( 0 < port )
    {
        if( 0 > bind( udpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
        {
            ConsolePrintf( PRIO_ERROR, RED"Failed to bind UDP socket"RESETCOLOR"\n" );
        }
        if( 0 > bind( tcpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
        {
            ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to bind TCP socket"RESETCOLOR"\n" );
        }
    }
    if( isServer )
    {
        pthread_t acceptThread;
        pthread_create( &acceptThread, NULL, CMostIpc::TcpAcceptThread, ( void * )this );
    }
    pthread_t udpRxThread, txThread;
    pthread_create( &udpRxThread, NULL, CMostIpc::UdpWorkerRXThread, ( void * )this );
    pthread_create( &txThread, NULL, CMostIpc::TXThread, ( void * )this );
}

CMostIpc::~CMostIpc()
{
    allowThreadRun = false;
    close( udpSock );
    close( tcpSock );
    sem_post(&semTx);
    int32_t timeout = 100;
    while( ( udpRxThreadIsRunning || txThreadIsRunning || acceptThreadIsRunning ) && ( --timeout > 0 ) )
        usleep( 10000 );
    pthread_mutex_lock( &critical_mutex_tcp );
    for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
    {
        TcpClientInfo_t *var = m_TcpClientConnections[i];
        if( NULL != var )
        {
            if( var->clientSock >= 0 )
                close( var->clientSock );
            var->clientSock = -1;
        }
    }
    m_TcpClientConnections.RemoveAll(false);
    pthread_mutex_unlock( &critical_mutex_tcp );
}

void CMostIpc::SendMsg( CMostMsgTx *Msg )
{
    if( NULL != Msg )
    {
        Msg->AddReference();
        m_MsgTxQueue.PushBack( Msg );
        sem_post(&semTx);
    }
}

void CMostIpc::RegisterMessageHandler( CMsgFilter *Filter )
{
    m_MsgFilterQueue.PushBack( Filter );
}

void CMostIpc::UnregisterMessageHandler( CMsgFilter *Filter )
{
    m_MsgFilterQueue.Remove(Filter);
}

void *CMostIpc::TcpWorkerThread( void *pInst )
{
    bool allowWorker = true;
    TcpClientInfo_t *clientInfo = ( TcpClientInfo_t * )pInst;
    if( NULL == clientInfo || NULL == clientInfo->ipc || 0 > clientInfo->clientSock )
    {
        ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Invalid parameters"RESETCOLOR"\n" );
        return NULL;
    }
    uint8_t *buff = ( uint8_t * )malloc( TCP_BUFFER_SIZE );
    if( NULL == buff )
    {
        ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Failed to allocate buffer"RESETCOLOR"\n" );
        return NULL;
    }

    ConsolePrintf( PRIO_MEDIUM, "TcpWorkerThread: Start for IP:%s, Port:%d\n", clientInfo->clientIP, clientInfo->clientPort );

    CMsgAddr *adr = new CMsgAddr( clientInfo->clientIP, clientInfo->clientPort, IpcTcp_V2_0 );
    CMostMsg *msg = NULL;

    while( clientInfo->ipc->allowThreadRun && allowWorker )
    {
        int nRead;
        if( 0 >= ( nRead = read( clientInfo->clientSock, buff, TCP_BUFFER_SIZE ) ) )
        {
            if( clientInfo->ipc->allowThreadRun )
            {
                if( 0 == nRead )
                    ConsolePrintf( PRIO_MEDIUM, GREEN"TcpWorkerThread: TCP Client disconnected (IP:%s)"RESETCOLOR"\n",
                    clientInfo->clientIP );
                else
                    ConsolePrintf( PRIO_ERROR, YELLOW"TcpWorkerThread: Read error (return=%d), aborting .."RESETCOLOR"\n", nRead );
            }
            allowWorker = false;
            break;
        }
        for( int i = 0; i < nRead; i++ )
        {
            if( NULL == msg )
                msg = new CMostMsg();
            if( !msg->Parse( buff[i] ) )
            {
                ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Parse error, aborting.."RESETCOLOR"\n" );
                allowWorker = false;
                break;
            }
            if( msg->IsValid() )
            {
                clientInfo->ipc->OnMsgReceived( adr, msg );
                msg->RemoveReference();
                msg = NULL;
            }
        }
    }
    ConsolePrintf( PRIO_LOW, "TcpWorkerThread: End\n" );
    if( clientInfo->clientSock >= 0 )
        close( clientInfo->clientSock );
    
    pthread_mutex_t *pMutex = &clientInfo->ipc->critical_mutex_tcp;
    pthread_mutex_lock( pMutex );
    clientInfo->ipc->m_TcpClientConnections.Remove(clientInfo);
    free( clientInfo );
    pthread_mutex_unlock( pMutex );
    
    adr->RemoveReference();
    free( buff );
    return NULL;
}

void *CMostIpc::TcpAcceptThread( void *pInst )
{
    CMostIpc *ipc = ( CMostIpc * )pInst;
    if( NULL == ipc || 0 > ipc->tcpSock )
    {
        ConsolePrintf( PRIO_ERROR, RED"TcpAcceptThread was called with invalid parameters"RESETCOLOR"\n" );
        return NULL;
    }

    ConsolePrintf( PRIO_LOW, "TcpAcceptThread starts\n" );
    ipc->acceptThreadIsRunning = true;
    while( ipc->allowThreadRun )
    {
        listen( ipc->tcpSock, 5 );
        socklen_t size = sizeof( struct sockaddr_in );
        TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
        if( NULL == var )
        {
            ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in TCP accept Thread, aborting.."RESETCOLOR"\n" );
            break;
        }
        struct sockaddr_in clientAddr;
        var->clientSock = accept( ipc->tcpSock, ( struct sockaddr * )&clientAddr, &size );
        if( !ipc->allowThreadRun )
            break;
        if( -1 == var->clientSock )
        {
            ConsolePrintf( PRIO_ERROR, RED"Failed to accept connection"RESETCOLOR"\n" );
            free( var );
            continue;
        }
        var->ipc = ipc;
        inet_ntop( AF_INET, ( struct sockaddr_in * )&clientAddr.sin_addr, var->clientIP, sizeof ( var->clientIP ) );
        var->clientPort = ntohs( clientAddr.sin_port );
        ConsolePrintf( PRIO_MEDIUM, GREEN"TCP Client connected: IP=%s, Port=%d"RESETCOLOR"\n",
            var->clientIP, var->clientPort );
        pthread_mutex_lock( &var->ipc->critical_mutex_tcp );
        var->ipc->m_TcpClientConnections.PushBack( var );
        pthread_mutex_unlock( &var->ipc->critical_mutex_tcp );
        pthread_create( &var->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )var );
    }
    ConsolePrintf( PRIO_LOW, "TcpAcceptThread ends\n" );
    close( ipc->tcpSock );
    ipc->acceptThreadIsRunning = false;
    return NULL;
}

void CMostIpc::OnMsgReceived( CMsgAddr *Addr, CMostMsg *msg )
{
    if( NULL == Addr || NULL == msg )
    {
        ConsolePrintf(
            PRIO_ERROR, RED"CMostIpc::OnMsgReceived was called with invalid parameters"RESETCOLOR"\n" );
        return;
    }
    ConsolePrintf( PRIO_LOW, "Received MOST-IPC message IP=%s, Protocol:%s, "\
        "FBlock:0x%X, Func:0x%X, OP:0x%X, PL:%d Bytes\n", Addr->GetInetAddress(), ( Addr->GetProtocol() ==
        IpcUdp_V2_0 ? "UDP" : "TCP" ), msg->GetFBlock(), msg->GetFunc(), msg->GetOpType(), msg->GetPayloadLen() );

    for( uint32_t i = 0; i < m_MsgFilterQueue.Size(); i++ )
    {
        m_MsgFilterQueue[i]->Filter( Addr, msg );
    }
}

bool CMostIpc::SendUdp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
{
    struct sockaddr_in destination;
    memset( &destination, 0, sizeof ( destination ) );
    destination.sin_family = AF_INET;
    destination.sin_addr.s_addr = inet_addr( ipAddress );
    destination.sin_port = htons( port );
    ssize_t bytesSent = sendto( udpSock, buffer, bufferLen,
        0, ( struct sockaddr * )&destination, sizeof ( destination ) );
    return ( ( uint32_t )bytesSent == bufferLen );
}

bool CMostIpc::SendTcp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
{
    bool success = false;
    TcpClientInfo_t *tcp = NULL;
    
    //Find the correct client connection
    pthread_mutex_lock( &critical_mutex_tcp );
    for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
    {
        TcpClientInfo_t *t = m_TcpClientConnections[i];
        if( NULL != t && t->clientPort == port && 0 == strcmp( t->clientIP, ipAddress ) )
        {
            tcp = t;
            break;
        }
    }
    //There is no connection matching to the given address and port, try to connect if we are not master
    if( !isServer && NULL == tcp )
    {
        TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
        if( NULL != var )
        {
            var->ipc = this;
            var->clientPort = port;
            var->clientSock = tcpSock;
            strncpy( var->clientIP, ipAddress, sizeof( var->clientIP ) );

            struct hostent *hostp;
            hostp = gethostbyname( var->clientIP );
            if( NULL != hostp )
            {
                struct sockaddr_in addr;
                memset( &addr, 0x00, sizeof( struct sockaddr_in ) );
                addr.sin_family = AF_INET;
                addr.sin_port = htons( var->clientPort );
                memcpy( &addr.sin_addr, hostp->h_addr, sizeof( addr.sin_addr ) );
                if( 0 == connect( tcpSock, ( struct sockaddr * )&addr, sizeof( addr ) ) )
                {
                    tcp = var;
                    m_TcpClientConnections.PushBack( tcp );
                    pthread_create( &tcp->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )tcp );
                    ConsolePrintf( PRIO_MEDIUM, GREEN"Connect to TCP host succeeded: IP=%s Port=%d"RESETCOLOR"\n", tcp->clientIP,
                        tcp->clientPort );
                    
                }
                else
                {
                    ConsolePrintf( PRIO_ERROR, RED"Could not connect to host: '%s'"RESETCOLOR"\n", var->clientIP );
                    free( var );
                }
            }
            else
            {
                ConsolePrintf( PRIO_ERROR, RED"Could not find host: '%s'"RESETCOLOR"\n", var->clientIP );
                free( var );
            }
        }
        else
        {
            ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in SendTcp method"RESETCOLOR"\n" );
        }
    }
    
    if( NULL != tcp )
    {
        int result;
        uint32_t written = 0;
        success = true;
        while( written < bufferLen )
        {
            result = write( tcp->clientSock, &buffer[written], bufferLen - written );
            if( result < 0 )
            {
                ConsolePrintf( PRIO_ERROR, RED"SendTcp: Write error"RESETCOLOR"\n" );
                success = false;
                break;
            }
            else if( result == 0 )
            {
                //Socket is non blocking, sleep to avoid 100% CPU load
                usleep( 1000 );
            }
            else
            {
                written += result;
            }
        }
        pthread_mutex_unlock( &critical_mutex_tcp );
    }
    return success;
}

void *CMostIpc::UdpWorkerRXThread( void *pInst )
{
    CMostIpc *ipc = ( CMostIpc * )pInst;
    if( NULL == ipc || 0 > ipc->udpSock )
    {
        ConsolePrintf( PRIO_ERROR, RED"UdpWorkerRXThread was called with invalid parameters"RESETCOLOR"\n" );
        return NULL;
    }
    ipc->udpRxThreadIsRunning = true;
    while( ipc->allowThreadRun )
    {
        //Receive
        if( ipc->allowThreadRun )
        {
            uint8_t tempBuffer[8*1024];
            char remoteIP[INET6_ADDRSTRLEN];
            uint32_t port;
            struct sockaddr_in fromAddr;
            socklen_t fromAddrLen = sizeof( fromAddr );
            ssize_t receivedLength;
            receivedLength = recvfrom
                ( ipc->udpSock, tempBuffer, sizeof( tempBuffer ),
                0, ( struct sockaddr * )&fromAddr, &fromAddrLen );
            if( receivedLength > 0 )
            {
                inet_ntop( AF_INET, ( struct sockaddr_in * )&fromAddr.sin_addr, remoteIP, sizeof( remoteIP ) );
                port = ntohs( fromAddr.sin_port );
                CMsgAddr *adr = new CMsgAddr( remoteIP, port );
                CMostMsg *msg = new CMostMsg();
                for( int32_t i = 0; i < receivedLength; i++ )
                {
                    if( !msg->Parse( tempBuffer[i] ) )
                    {
                        ConsolePrintf( PRIO_ERROR, RED"CMostIpc UDP received unparsable message"RESETCOLOR"\n" );
                        break;
                    }
                }
                if( msg->IsValid() )
                    ipc->OnMsgReceived( adr, msg );
                adr->RemoveReference();
                msg->RemoveReference();
            }
        }
    }
    ipc->udpRxThreadIsRunning = false;
    return NULL;
}

void *CMostIpc::TXThread( void *pInst )
{
    CMostIpc *ipc = ( CMostIpc * )pInst;
    if( NULL == ipc || 0 > ipc->udpSock )
    {
        ConsolePrintf( PRIO_ERROR, RED"TXThread was called with invalid parameters"RESETCOLOR"\n" );
        return NULL;
    }
    ipc->txThreadIsRunning = true;
    
    while( ipc->allowThreadRun )
    {
        sem_wait(&ipc->semTx);
        if (!ipc->allowThreadRun)
            break;
        CMostMsgTx *MsgTx = ipc->m_MsgTxQueue.PopFront();
        assert (NULL != MsgTx);
        if( NULL != MsgTx )
        {
            uint8_t *tempBuffer = NULL;
            uint32_t bytesUsed = MsgTx->ToByteArray( &tempBuffer );
            if( NULL == tempBuffer || 0 == bytesUsed )
            {
                ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Failed to serialize message"RESETCOLOR"\n" );
                if (NULL != tempBuffer)
                    free(tempBuffer);
                MsgTx->MsgSent( false );
                continue;
            }
            CMsgAddr *addr = MsgTx->GetAddr();
            if( 0 != bytesUsed && NULL != addr )
            {
                char *ipAddress = addr->GetInetAddress();
                uint32_t port = addr->GetPort();
                if( NULL != ipAddress )
                {
                    bool success = false;
                    switch( addr->GetProtocol() )
                    {
                    case IpcUdp_V2_0:
                        success = ipc->SendUdp( ipAddress, port, tempBuffer, bytesUsed );
                        break;
                    case IpcTcp_V2_0:
                        success = ipc->SendTcp( ipAddress, port, tempBuffer, bytesUsed );
                        break;
                    default:
                        ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Unknown Protocol"RESETCOLOR"\n" );
                        break;
                    }
                    MsgTx->MsgSent( success );
                }
                else
                {
                    ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Can not resolve IP address"RESETCOLOR"\n" );
                    MsgTx->MsgSent( false );
                }
            }
            else
            {
                ConsolePrintf( PRIO_ERROR, RED"Error while sending in MOST IPC."RESETCOLOR"\n" );
                MsgTx->MsgSent( false );
            }
            if (NULL != tempBuffer)
                free( tempBuffer );
            MsgTx->RemoveReference();
        }
    }
    ipc->txThreadIsRunning = false;
    return NULL;
}