/*
* Video On Demand Samples
*
* Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
* You may also obtain this software under a propriety license from Microchip.
* Please contact Microchip for further information.
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Console.h"
#include "MostIpc.h"
#define TCP_BUFFER_SIZE (1024*1024)
CMostIpc::CMostIpc( int port, bool server ) : isServer( server ), allowThreadRun( true ),
udpRxThreadIsRunning( false ), txThreadIsRunning (false), acceptThreadIsRunning( false )
{
if( isServer && 0 >= port )
{
ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Server port must be greater than 0!"RESETCOLOR"\n" );
return;
}
pthread_mutex_init( &critical_mutex_tcp, NULL );
sem_init( &semTx, 0, 0 ); //Mutex, initialized to 0 => sem_wait will block instantly
struct sockaddr_in listenAddress;
/* Receive address configuration */
memset( &listenAddress, 0, sizeof( listenAddress ) );
listenAddress.sin_family = AF_INET;
listenAddress.sin_addr.s_addr = htonl( INADDR_ANY );
listenAddress.sin_port = htons( port );
if( 0 > ( udpSock = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP ) ) )
{
ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create UDP socket"RESETCOLOR"\n" );
return;
}
if( 0 > ( tcpSock = socket( PF_INET, SOCK_STREAM, 0 ) ) )
{
ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create TCP socket"RESETCOLOR"\n" );
return;
}
//Avoid hanging socket after killing application, only works with Linux kernel > 3.9
#if defined (SO_REUSEADDR) && defined (SO_REUSEPORT)
int one = 1;
setsockopt(udpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
setsockopt(tcpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
#endif
// this call is what allows broadcast UDP packets to be sent:
int enableSocketFlag = 1;
if( 0 > setsockopt( udpSock, SOL_SOCKET, SO_BROADCAST, &enableSocketFlag, sizeof( enableSocketFlag ) ) )
{
ConsolePrintf(
PRIO_ERROR, RED"CMostIpc: setsockopt (SO_BROADCAST) failed, tried to set to: %d"RESETCOLOR"\n", enableSocketFlag );
}
if( 0 < port )
{
if( 0 > bind( udpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
{
ConsolePrintf( PRIO_ERROR, RED"Failed to bind UDP socket"RESETCOLOR"\n" );
}
if( 0 > bind( tcpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
{
ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to bind TCP socket"RESETCOLOR"\n" );
}
}
if( isServer )
{
pthread_t acceptThread;
pthread_create( &acceptThread, NULL, CMostIpc::TcpAcceptThread, ( void * )this );
}
pthread_t udpRxThread, txThread;
pthread_create( &udpRxThread, NULL, CMostIpc::UdpWorkerRXThread, ( void * )this );
pthread_create( &txThread, NULL, CMostIpc::TXThread, ( void * )this );
}
CMostIpc::~CMostIpc()
{
allowThreadRun = false;
close( udpSock );
close( tcpSock );
sem_post(&semTx);
int32_t timeout = 100;
while( ( udpRxThreadIsRunning || txThreadIsRunning || acceptThreadIsRunning ) && ( --timeout > 0 ) )
usleep( 10000 );
pthread_mutex_lock( &critical_mutex_tcp );
for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
{
TcpClientInfo_t *var = m_TcpClientConnections[i];
if( NULL != var )
{
if( var->clientSock >= 0 )
close( var->clientSock );
var->clientSock = -1;
}
}
m_TcpClientConnections.RemoveAll(false);
pthread_mutex_unlock( &critical_mutex_tcp );
}
void CMostIpc::SendMsg( CMostMsgTx *Msg )
{
if( NULL != Msg )
{
Msg->AddReference();
m_MsgTxQueue.PushBack( Msg );
sem_post(&semTx);
}
}
void CMostIpc::RegisterMessageHandler( CMsgFilter *Filter )
{
m_MsgFilterQueue.PushBack( Filter );
}
void CMostIpc::UnregisterMessageHandler( CMsgFilter *Filter )
{
m_MsgFilterQueue.Remove(Filter);
}
void *CMostIpc::TcpWorkerThread( void *pInst )
{
bool allowWorker = true;
TcpClientInfo_t *clientInfo = ( TcpClientInfo_t * )pInst;
if( NULL == clientInfo || NULL == clientInfo->ipc || 0 > clientInfo->clientSock )
{
ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Invalid parameters"RESETCOLOR"\n" );
return NULL;
}
uint8_t *buff = ( uint8_t * )malloc( TCP_BUFFER_SIZE );
if( NULL == buff )
{
ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Failed to allocate buffer"RESETCOLOR"\n" );
return NULL;
}
ConsolePrintf( PRIO_MEDIUM, "TcpWorkerThread: Start for IP:%s, Port:%d\n", clientInfo->clientIP, clientInfo->clientPort );
CMsgAddr *adr = new CMsgAddr( clientInfo->clientIP, clientInfo->clientPort, IpcTcp_V2_0 );
CMostMsg *msg = NULL;
while( clientInfo->ipc->allowThreadRun && allowWorker )
{
int nRead;
if( 0 >= ( nRead = read( clientInfo->clientSock, buff, TCP_BUFFER_SIZE ) ) )
{
if( clientInfo->ipc->allowThreadRun )
{
if( 0 == nRead )
ConsolePrintf( PRIO_MEDIUM, GREEN"TcpWorkerThread: TCP Client disconnected (IP:%s)"RESETCOLOR"\n",
clientInfo->clientIP );
else
ConsolePrintf( PRIO_ERROR, YELLOW"TcpWorkerThread: Read error (return=%d), aborting .."RESETCOLOR"\n", nRead );
}
allowWorker = false;
break;
}
for( int i = 0; i < nRead; i++ )
{
if( NULL == msg )
msg = new CMostMsg();
if( !msg->Parse( buff[i] ) )
{
ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Parse error, aborting.."RESETCOLOR"\n" );
allowWorker = false;
break;
}
if( msg->IsValid() )
{
clientInfo->ipc->OnMsgReceived( adr, msg );
msg->RemoveReference();
msg = NULL;
}
}
}
ConsolePrintf( PRIO_LOW, "TcpWorkerThread: End\n" );
if( clientInfo->clientSock >= 0 )
close( clientInfo->clientSock );
pthread_mutex_t *pMutex = &clientInfo->ipc->critical_mutex_tcp;
pthread_mutex_lock( pMutex );
clientInfo->ipc->m_TcpClientConnections.Remove(clientInfo);
free( clientInfo );
pthread_mutex_unlock( pMutex );
adr->RemoveReference();
free( buff );
return NULL;
}
void *CMostIpc::TcpAcceptThread( void *pInst )
{
CMostIpc *ipc = ( CMostIpc * )pInst;
if( NULL == ipc || 0 > ipc->tcpSock )
{
ConsolePrintf( PRIO_ERROR, RED"TcpAcceptThread was called with invalid parameters"RESETCOLOR"\n" );
return NULL;
}
ConsolePrintf( PRIO_LOW, "TcpAcceptThread starts\n" );
ipc->acceptThreadIsRunning = true;
while( ipc->allowThreadRun )
{
listen( ipc->tcpSock, 5 );
socklen_t size = sizeof( struct sockaddr_in );
TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
if( NULL == var )
{
ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in TCP accept Thread, aborting.."RESETCOLOR"\n" );
break;
}
struct sockaddr_in clientAddr;
var->clientSock = accept( ipc->tcpSock, ( struct sockaddr * )&clientAddr, &size );
if( !ipc->allowThreadRun )
break;
if( -1 == var->clientSock )
{
ConsolePrintf( PRIO_ERROR, RED"Failed to accept connection"RESETCOLOR"\n" );
free( var );
continue;
}
var->ipc = ipc;
inet_ntop( AF_INET, ( struct sockaddr_in * )&clientAddr.sin_addr, var->clientIP, sizeof ( var->clientIP ) );
var->clientPort = ntohs( clientAddr.sin_port );
ConsolePrintf( PRIO_MEDIUM, GREEN"TCP Client connected: IP=%s, Port=%d"RESETCOLOR"\n",
var->clientIP, var->clientPort );
pthread_mutex_lock( &var->ipc->critical_mutex_tcp );
var->ipc->m_TcpClientConnections.PushBack( var );
pthread_mutex_unlock( &var->ipc->critical_mutex_tcp );
pthread_create( &var->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )var );
}
ConsolePrintf( PRIO_LOW, "TcpAcceptThread ends\n" );
close( ipc->tcpSock );
ipc->acceptThreadIsRunning = false;
return NULL;
}
void CMostIpc::OnMsgReceived( CMsgAddr *Addr, CMostMsg *msg )
{
if( NULL == Addr || NULL == msg )
{
ConsolePrintf(
PRIO_ERROR, RED"CMostIpc::OnMsgReceived was called with invalid parameters"RESETCOLOR"\n" );
return;
}
ConsolePrintf( PRIO_LOW, "Received MOST-IPC message IP=%s, Protocol:%s, "\
"FBlock:0x%X, Func:0x%X, OP:0x%X, PL:%d Bytes\n", Addr->GetInetAddress(), ( Addr->GetProtocol() ==
IpcUdp_V2_0 ? "UDP" : "TCP" ), msg->GetFBlock(), msg->GetFunc(), msg->GetOpType(), msg->GetPayloadLen() );
for( uint32_t i = 0; i < m_MsgFilterQueue.Size(); i++ )
{
m_MsgFilterQueue[i]->Filter( Addr, msg );
}
}
bool CMostIpc::SendUdp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
{
struct sockaddr_in destination;
memset( &destination, 0, sizeof ( destination ) );
destination.sin_family = AF_INET;
destination.sin_addr.s_addr = inet_addr( ipAddress );
destination.sin_port = htons( port );
ssize_t bytesSent = sendto( udpSock, buffer, bufferLen,
0, ( struct sockaddr * )&destination, sizeof ( destination ) );
return ( ( uint32_t )bytesSent == bufferLen );
}
bool CMostIpc::SendTcp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
{
bool success = false;
TcpClientInfo_t *tcp = NULL;
//Find the correct client connection
pthread_mutex_lock( &critical_mutex_tcp );
for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
{
TcpClientInfo_t *t = m_TcpClientConnections[i];
if( NULL != t && t->clientPort == port && 0 == strcmp( t->clientIP, ipAddress ) )
{
tcp = t;
break;
}
}
//There is no connection matching to the given address and port, try to connect if we are not master
if( !isServer && NULL == tcp )
{
TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
if( NULL != var )
{
var->ipc = this;
var->clientPort = port;
var->clientSock = tcpSock;
strncpy( var->clientIP, ipAddress, sizeof( var->clientIP ) );
struct hostent *hostp;
hostp = gethostbyname( var->clientIP );
if( NULL != hostp )
{
struct sockaddr_in addr;
memset( &addr, 0x00, sizeof( struct sockaddr_in ) );
addr.sin_family = AF_INET;
addr.sin_port = htons( var->clientPort );
memcpy( &addr.sin_addr, hostp->h_addr, sizeof( addr.sin_addr ) );
if( 0 == connect( tcpSock, ( struct sockaddr * )&addr, sizeof( addr ) ) )
{
tcp = var;
m_TcpClientConnections.PushBack( tcp );
pthread_create( &tcp->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )tcp );
ConsolePrintf( PRIO_MEDIUM, GREEN"Connect to TCP host succeeded: IP=%s Port=%d"RESETCOLOR"\n", tcp->clientIP,
tcp->clientPort );
}
else
{
ConsolePrintf( PRIO_ERROR, RED"Could not connect to host: '%s'"RESETCOLOR"\n", var->clientIP );
free( var );
}
}
else
{
ConsolePrintf( PRIO_ERROR, RED"Could not find host: '%s'"RESETCOLOR"\n", var->clientIP );
free( var );
}
}
else
{
ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in SendTcp method"RESETCOLOR"\n" );
}
}
if( NULL != tcp )
{
int result;
uint32_t written = 0;
success = true;
while( written < bufferLen )
{
result = write( tcp->clientSock, &buffer[written], bufferLen - written );
if( result < 0 )
{
ConsolePrintf( PRIO_ERROR, RED"SendTcp: Write error"RESETCOLOR"\n" );
success = false;
break;
}
else if( result == 0 )
{
//Socket is non blocking, sleep to avoid 100% CPU load
usleep( 1000 );
}
else
{
written += result;
}
}
pthread_mutex_unlock( &critical_mutex_tcp );
}
return success;
}
void *CMostIpc::UdpWorkerRXThread( void *pInst )
{
CMostIpc *ipc = ( CMostIpc * )pInst;
if( NULL == ipc || 0 > ipc->udpSock )
{
ConsolePrintf( PRIO_ERROR, RED"UdpWorkerRXThread was called with invalid parameters"RESETCOLOR"\n" );
return NULL;
}
ipc->udpRxThreadIsRunning = true;
while( ipc->allowThreadRun )
{
//Receive
if( ipc->allowThreadRun )
{
uint8_t tempBuffer[8*1024];
char remoteIP[INET6_ADDRSTRLEN];
uint32_t port;
struct sockaddr_in fromAddr;
socklen_t fromAddrLen = sizeof( fromAddr );
ssize_t receivedLength;
receivedLength = recvfrom
( ipc->udpSock, tempBuffer, sizeof( tempBuffer ),
0, ( struct sockaddr * )&fromAddr, &fromAddrLen );
if( receivedLength > 0 )
{
inet_ntop( AF_INET, ( struct sockaddr_in * )&fromAddr.sin_addr, remoteIP, sizeof( remoteIP ) );
port = ntohs( fromAddr.sin_port );
CMsgAddr *adr = new CMsgAddr( remoteIP, port );
CMostMsg *msg = new CMostMsg();
for( int32_t i = 0; i < receivedLength; i++ )
{
if( !msg->Parse( tempBuffer[i] ) )
{
ConsolePrintf( PRIO_ERROR, RED"CMostIpc UDP received unparsable message"RESETCOLOR"\n" );
break;
}
}
if( msg->IsValid() )
ipc->OnMsgReceived( adr, msg );
adr->RemoveReference();
msg->RemoveReference();
}
}
}
ipc->udpRxThreadIsRunning = false;
return NULL;
}
void *CMostIpc::TXThread( void *pInst )
{
CMostIpc *ipc = ( CMostIpc * )pInst;
if( NULL == ipc || 0 > ipc->udpSock )
{
ConsolePrintf( PRIO_ERROR, RED"TXThread was called with invalid parameters"RESETCOLOR"\n" );
return NULL;
}
ipc->txThreadIsRunning = true;
while( ipc->allowThreadRun )
{
sem_wait(&ipc->semTx);
if (!ipc->allowThreadRun)
break;
CMostMsgTx *MsgTx = ipc->m_MsgTxQueue.PopFront();
assert (NULL != MsgTx);
if( NULL != MsgTx )
{
uint8_t *tempBuffer = NULL;
uint32_t bytesUsed = MsgTx->ToByteArray( &tempBuffer );
if( NULL == tempBuffer || 0 == bytesUsed )
{
ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Failed to serialize message"RESETCOLOR"\n" );
if (NULL != tempBuffer)
free(tempBuffer);
MsgTx->MsgSent( false );
continue;
}
CMsgAddr *addr = MsgTx->GetAddr();
if( 0 != bytesUsed && NULL != addr )
{
char *ipAddress = addr->GetInetAddress();
uint32_t port = addr->GetPort();
if( NULL != ipAddress )
{
bool success = false;
switch( addr->GetProtocol() )
{
case IpcUdp_V2_0:
success = ipc->SendUdp( ipAddress, port, tempBuffer, bytesUsed );
break;
case IpcTcp_V2_0:
success = ipc->SendTcp( ipAddress, port, tempBuffer, bytesUsed );
break;
default:
ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Unknown Protocol"RESETCOLOR"\n" );
break;
}
MsgTx->MsgSent( success );
}
else
{
ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Can not resolve IP address"RESETCOLOR"\n" );
MsgTx->MsgSent( false );
}
}
else
{
ConsolePrintf( PRIO_ERROR, RED"Error while sending in MOST IPC."RESETCOLOR"\n" );
MsgTx->MsgSent( false );
}
if (NULL != tempBuffer)
free( tempBuffer );
MsgTx->RemoveReference();
}
}
ipc->txThreadIsRunning = false;
return NULL;
}