/* * Video On Demand Samples * * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * * You may also obtain this software under a propriety license from Microchip. * Please contact Microchip for further information. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "Console.h" #include "MostIpc.h" #define TCP_BUFFER_SIZE (1024*1024) CMostIpc::CMostIpc( int port, bool server ) : isServer( server ), allowThreadRun( true ), udpRxThreadIsRunning( false ), txThreadIsRunning (false), acceptThreadIsRunning( false ) { if( isServer && 0 >= port ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Server port must be greater than 0!"RESETCOLOR"\n" ); return; } pthread_mutex_init( &critical_mutex_tcp, NULL ); sem_init( &semTx, 0, 0 ); //Mutex, initialized to 0 => sem_wait will block instantly struct sockaddr_in listenAddress; /* Receive address configuration */ memset( &listenAddress, 0, sizeof( listenAddress ) ); listenAddress.sin_family = AF_INET; listenAddress.sin_addr.s_addr = htonl( INADDR_ANY ); listenAddress.sin_port = htons( port ); if( 0 > ( udpSock = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP ) ) ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create UDP socket"RESETCOLOR"\n" ); return; } if( 0 > ( tcpSock = socket( PF_INET, SOCK_STREAM, 0 ) ) ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create TCP socket"RESETCOLOR"\n" ); return; } //Avoid hanging socket after killing application, only works with Linux kernel > 3.9 #if defined (SO_REUSEADDR) && defined (SO_REUSEPORT) int one = 1; setsockopt(udpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one)); setsockopt(tcpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one)); #endif // this call is what allows broadcast UDP packets to be sent: int enableSocketFlag = 1; if( 0 > setsockopt( udpSock, SOL_SOCKET, SO_BROADCAST, &enableSocketFlag, sizeof( enableSocketFlag ) ) ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc: setsockopt (SO_BROADCAST) failed, tried to set to: %d"RESETCOLOR"\n", enableSocketFlag ); } if( 0 < port ) { if( 0 > bind( udpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) ) { ConsolePrintf( PRIO_ERROR, RED"Failed to bind UDP socket"RESETCOLOR"\n" ); } if( 0 > bind( tcpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to bind TCP socket"RESETCOLOR"\n" ); } } if( isServer ) { pthread_t acceptThread; pthread_create( &acceptThread, NULL, CMostIpc::TcpAcceptThread, ( void * )this ); } pthread_t udpRxThread, txThread; pthread_create( &udpRxThread, NULL, CMostIpc::UdpWorkerRXThread, ( void * )this ); pthread_create( &txThread, NULL, CMostIpc::TXThread, ( void * )this ); } CMostIpc::~CMostIpc() { allowThreadRun = false; close( udpSock ); close( tcpSock ); sem_post(&semTx); int32_t timeout = 100; while( ( udpRxThreadIsRunning || txThreadIsRunning || acceptThreadIsRunning ) && ( --timeout > 0 ) ) usleep( 10000 ); pthread_mutex_lock( &critical_mutex_tcp ); for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ ) { TcpClientInfo_t *var = m_TcpClientConnections[i]; if( NULL != var ) { if( var->clientSock >= 0 ) close( var->clientSock ); var->clientSock = -1; } } m_TcpClientConnections.RemoveAll(false); pthread_mutex_unlock( &critical_mutex_tcp ); } void CMostIpc::SendMsg( CMostMsgTx *Msg ) { if( NULL != Msg ) { Msg->AddReference(); m_MsgTxQueue.PushBack( Msg ); sem_post(&semTx); } } void CMostIpc::RegisterMessageHandler( CMsgFilter *Filter ) { m_MsgFilterQueue.PushBack( Filter ); } void CMostIpc::UnregisterMessageHandler( CMsgFilter *Filter ) { m_MsgFilterQueue.Remove(Filter); } void *CMostIpc::TcpWorkerThread( void *pInst ) { bool allowWorker = true; TcpClientInfo_t *clientInfo = ( TcpClientInfo_t * )pInst; if( NULL == clientInfo || NULL == clientInfo->ipc || 0 > clientInfo->clientSock ) { ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Invalid parameters"RESETCOLOR"\n" ); return NULL; } uint8_t *buff = ( uint8_t * )malloc( TCP_BUFFER_SIZE ); if( NULL == buff ) { ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Failed to allocate buffer"RESETCOLOR"\n" ); return NULL; } ConsolePrintf( PRIO_MEDIUM, "TcpWorkerThread: Start for IP:%s, Port:%d\n", clientInfo->clientIP, clientInfo->clientPort ); CMsgAddr *adr = new CMsgAddr( clientInfo->clientIP, clientInfo->clientPort, IpcTcp_V2_0 ); CMostMsg *msg = NULL; while( clientInfo->ipc->allowThreadRun && allowWorker ) { int nRead; if( 0 >= ( nRead = read( clientInfo->clientSock, buff, TCP_BUFFER_SIZE ) ) ) { if( clientInfo->ipc->allowThreadRun ) { if( 0 == nRead ) ConsolePrintf( PRIO_MEDIUM, GREEN"TcpWorkerThread: TCP Client disconnected (IP:%s)"RESETCOLOR"\n", clientInfo->clientIP ); else ConsolePrintf( PRIO_ERROR, YELLOW"TcpWorkerThread: Read error (return=%d), aborting .."RESETCOLOR"\n", nRead ); } allowWorker = false; break; } for( int i = 0; i < nRead; i++ ) { if( NULL == msg ) msg = new CMostMsg(); if( !msg->Parse( buff[i] ) ) { ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Parse error, aborting.."RESETCOLOR"\n" ); allowWorker = false; break; } if( msg->IsValid() ) { clientInfo->ipc->OnMsgReceived( adr, msg ); msg->RemoveReference(); msg = NULL; } } } ConsolePrintf( PRIO_LOW, "TcpWorkerThread: End\n" ); if( clientInfo->clientSock >= 0 ) close( clientInfo->clientSock ); pthread_mutex_t *pMutex = &clientInfo->ipc->critical_mutex_tcp; pthread_mutex_lock( pMutex ); clientInfo->ipc->m_TcpClientConnections.Remove(clientInfo); free( clientInfo ); pthread_mutex_unlock( pMutex ); adr->RemoveReference(); free( buff ); return NULL; } void *CMostIpc::TcpAcceptThread( void *pInst ) { CMostIpc *ipc = ( CMostIpc * )pInst; if( NULL == ipc || 0 > ipc->tcpSock ) { ConsolePrintf( PRIO_ERROR, RED"TcpAcceptThread was called with invalid parameters"RESETCOLOR"\n" ); return NULL; } ConsolePrintf( PRIO_LOW, "TcpAcceptThread starts\n" ); ipc->acceptThreadIsRunning = true; while( ipc->allowThreadRun ) { listen( ipc->tcpSock, 5 ); socklen_t size = sizeof( struct sockaddr_in ); TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) ); if( NULL == var ) { ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in TCP accept Thread, aborting.."RESETCOLOR"\n" ); break; } struct sockaddr_in clientAddr; var->clientSock = accept( ipc->tcpSock, ( struct sockaddr * )&clientAddr, &size ); if( !ipc->allowThreadRun ) break; if( -1 == var->clientSock ) { ConsolePrintf( PRIO_ERROR, RED"Failed to accept connection"RESETCOLOR"\n" ); free( var ); continue; } var->ipc = ipc; inet_ntop( AF_INET, ( struct sockaddr_in * )&clientAddr.sin_addr, var->clientIP, sizeof ( var->clientIP ) ); var->clientPort = ntohs( clientAddr.sin_port ); ConsolePrintf( PRIO_MEDIUM, GREEN"TCP Client connected: IP=%s, Port=%d"RESETCOLOR"\n", var->clientIP, var->clientPort ); pthread_mutex_lock( &var->ipc->critical_mutex_tcp ); var->ipc->m_TcpClientConnections.PushBack( var ); pthread_mutex_unlock( &var->ipc->critical_mutex_tcp ); pthread_create( &var->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )var ); } ConsolePrintf( PRIO_LOW, "TcpAcceptThread ends\n" ); close( ipc->tcpSock ); ipc->acceptThreadIsRunning = false; return NULL; } void CMostIpc::OnMsgReceived( CMsgAddr *Addr, CMostMsg *msg ) { if( NULL == Addr || NULL == msg ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc::OnMsgReceived was called with invalid parameters"RESETCOLOR"\n" ); return; } ConsolePrintf( PRIO_LOW, "Received MOST-IPC message IP=%s, Protocol:%s, "\ "FBlock:0x%X, Func:0x%X, OP:0x%X, PL:%d Bytes\n", Addr->GetInetAddress(), ( Addr->GetProtocol() == IpcUdp_V2_0 ? "UDP" : "TCP" ), msg->GetFBlock(), msg->GetFunc(), msg->GetOpType(), msg->GetPayloadLen() ); for( uint32_t i = 0; i < m_MsgFilterQueue.Size(); i++ ) { m_MsgFilterQueue[i]->Filter( Addr, msg ); } } bool CMostIpc::SendUdp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen ) { struct sockaddr_in destination; memset( &destination, 0, sizeof ( destination ) ); destination.sin_family = AF_INET; destination.sin_addr.s_addr = inet_addr( ipAddress ); destination.sin_port = htons( port ); ssize_t bytesSent = sendto( udpSock, buffer, bufferLen, 0, ( struct sockaddr * )&destination, sizeof ( destination ) ); return ( ( uint32_t )bytesSent == bufferLen ); } bool CMostIpc::SendTcp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen ) { bool success = false; TcpClientInfo_t *tcp = NULL; //Find the correct client connection pthread_mutex_lock( &critical_mutex_tcp ); for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ ) { TcpClientInfo_t *t = m_TcpClientConnections[i]; if( NULL != t && t->clientPort == port && 0 == strcmp( t->clientIP, ipAddress ) ) { tcp = t; break; } } //There is no connection matching to the given address and port, try to connect if we are not master if( !isServer && NULL == tcp ) { TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) ); if( NULL != var ) { var->ipc = this; var->clientPort = port; var->clientSock = tcpSock; strncpy( var->clientIP, ipAddress, sizeof( var->clientIP ) ); struct hostent *hostp; hostp = gethostbyname( var->clientIP ); if( NULL != hostp ) { struct sockaddr_in addr; memset( &addr, 0x00, sizeof( struct sockaddr_in ) ); addr.sin_family = AF_INET; addr.sin_port = htons( var->clientPort ); memcpy( &addr.sin_addr, hostp->h_addr, sizeof( addr.sin_addr ) ); if( 0 == connect( tcpSock, ( struct sockaddr * )&addr, sizeof( addr ) ) ) { tcp = var; m_TcpClientConnections.PushBack( tcp ); pthread_create( &tcp->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )tcp ); ConsolePrintf( PRIO_MEDIUM, GREEN"Connect to TCP host succeeded: IP=%s Port=%d"RESETCOLOR"\n", tcp->clientIP, tcp->clientPort ); } else { ConsolePrintf( PRIO_ERROR, RED"Could not connect to host: '%s'"RESETCOLOR"\n", var->clientIP ); free( var ); } } else { ConsolePrintf( PRIO_ERROR, RED"Could not find host: '%s'"RESETCOLOR"\n", var->clientIP ); free( var ); } } else { ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in SendTcp method"RESETCOLOR"\n" ); } } if( NULL != tcp ) { int result; uint32_t written = 0; success = true; while( written < bufferLen ) { result = write( tcp->clientSock, &buffer[written], bufferLen - written ); if( result < 0 ) { ConsolePrintf( PRIO_ERROR, RED"SendTcp: Write error"RESETCOLOR"\n" ); success = false; break; } else if( result == 0 ) { //Socket is non blocking, sleep to avoid 100% CPU load usleep( 1000 ); } else { written += result; } } pthread_mutex_unlock( &critical_mutex_tcp ); } return success; } void *CMostIpc::UdpWorkerRXThread( void *pInst ) { CMostIpc *ipc = ( CMostIpc * )pInst; if( NULL == ipc || 0 > ipc->udpSock ) { ConsolePrintf( PRIO_ERROR, RED"UdpWorkerRXThread was called with invalid parameters"RESETCOLOR"\n" ); return NULL; } ipc->udpRxThreadIsRunning = true; while( ipc->allowThreadRun ) { //Receive if( ipc->allowThreadRun ) { uint8_t tempBuffer[8*1024]; char remoteIP[INET6_ADDRSTRLEN]; uint32_t port; struct sockaddr_in fromAddr; socklen_t fromAddrLen = sizeof( fromAddr ); ssize_t receivedLength; receivedLength = recvfrom ( ipc->udpSock, tempBuffer, sizeof( tempBuffer ), 0, ( struct sockaddr * )&fromAddr, &fromAddrLen ); if( receivedLength > 0 ) { inet_ntop( AF_INET, ( struct sockaddr_in * )&fromAddr.sin_addr, remoteIP, sizeof( remoteIP ) ); port = ntohs( fromAddr.sin_port ); CMsgAddr *adr = new CMsgAddr( remoteIP, port ); CMostMsg *msg = new CMostMsg(); for( int32_t i = 0; i < receivedLength; i++ ) { if( !msg->Parse( tempBuffer[i] ) ) { ConsolePrintf( PRIO_ERROR, RED"CMostIpc UDP received unparsable message"RESETCOLOR"\n" ); break; } } if( msg->IsValid() ) ipc->OnMsgReceived( adr, msg ); adr->RemoveReference(); msg->RemoveReference(); } } } ipc->udpRxThreadIsRunning = false; return NULL; } void *CMostIpc::TXThread( void *pInst ) { CMostIpc *ipc = ( CMostIpc * )pInst; if( NULL == ipc || 0 > ipc->udpSock ) { ConsolePrintf( PRIO_ERROR, RED"TXThread was called with invalid parameters"RESETCOLOR"\n" ); return NULL; } ipc->txThreadIsRunning = true; while( ipc->allowThreadRun ) { sem_wait(&ipc->semTx); if (!ipc->allowThreadRun) break; CMostMsgTx *MsgTx = ipc->m_MsgTxQueue.PopFront(); assert (NULL != MsgTx); if( NULL != MsgTx ) { uint8_t *tempBuffer = NULL; uint32_t bytesUsed = MsgTx->ToByteArray( &tempBuffer ); if( NULL == tempBuffer || 0 == bytesUsed ) { ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Failed to serialize message"RESETCOLOR"\n" ); if (NULL != tempBuffer) free(tempBuffer); MsgTx->MsgSent( false ); continue; } CMsgAddr *addr = MsgTx->GetAddr(); if( 0 != bytesUsed && NULL != addr ) { char *ipAddress = addr->GetInetAddress(); uint32_t port = addr->GetPort(); if( NULL != ipAddress ) { bool success = false; switch( addr->GetProtocol() ) { case IpcUdp_V2_0: success = ipc->SendUdp( ipAddress, port, tempBuffer, bytesUsed ); break; case IpcTcp_V2_0: success = ipc->SendTcp( ipAddress, port, tempBuffer, bytesUsed ); break; default: ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Unknown Protocol"RESETCOLOR"\n" ); break; } MsgTx->MsgSent( success ); } else { ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Can not resolve IP address"RESETCOLOR"\n" ); MsgTx->MsgSent( false ); } } else { ConsolePrintf( PRIO_ERROR, RED"Error while sending in MOST IPC."RESETCOLOR"\n" ); MsgTx->MsgSent( false ); } if (NULL != tempBuffer) free( tempBuffer ); MsgTx->RemoveReference(); } } ipc->txThreadIsRunning = false; return NULL; }