diff options
Diffstat (limited to 'Src/IP/MostIpc.cpp')
-rw-r--r-- | Src/IP/MostIpc.cpp | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/Src/IP/MostIpc.cpp b/Src/IP/MostIpc.cpp new file mode 100644 index 0000000..86e7341 --- /dev/null +++ b/Src/IP/MostIpc.cpp @@ -0,0 +1,510 @@ +/* + * Video On Demand Samples + * + * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You may also obtain this software under a propriety license from Microchip. + * Please contact Microchip for further information. + * + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <errno.h> +#include <assert.h> +#include "Console.h" +#include "MostIpc.h" + + +#define TCP_BUFFER_SIZE (1024*1024) + +CMostIpc::CMostIpc( int port, bool server ) : isServer( server ), allowThreadRun( true ), + udpRxThreadIsRunning( false ), txThreadIsRunning (false), acceptThreadIsRunning( false ) +{ + if( isServer && 0 >= port ) + { + ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Server port must be greater than 0!"RESETCOLOR"\n" ); + return; + } + pthread_mutex_init( &critical_mutex_tcp, NULL ); + sem_init( &semTx, 0, 0 ); //Mutex, initialized to 0 => sem_wait will block instantly + + struct sockaddr_in listenAddress; + /* Receive address configuration */ + memset( &listenAddress, 0, sizeof( listenAddress ) ); + listenAddress.sin_family = AF_INET; + listenAddress.sin_addr.s_addr = htonl( INADDR_ANY ); + listenAddress.sin_port = htons( port ); + + if( 0 > ( udpSock = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP ) ) ) + { + ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create UDP socket"RESETCOLOR"\n" ); + return; + } + if( 0 > ( tcpSock = socket( PF_INET, SOCK_STREAM, 0 ) ) ) + { + ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create TCP socket"RESETCOLOR"\n" ); + return; + } + + //Avoid hanging socket after killing application, only works with Linux kernel > 3.9 +#if defined (SO_REUSEADDR) && defined (SO_REUSEPORT) + int one = 1; + setsockopt(udpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one)); + setsockopt(tcpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one)); +#endif + + // this call is what allows broadcast UDP packets to be sent: + int enableSocketFlag = 1; + if( 0 > setsockopt( udpSock, SOL_SOCKET, SO_BROADCAST, &enableSocketFlag, sizeof( enableSocketFlag ) ) ) + { + ConsolePrintf( + PRIO_ERROR, RED"CMostIpc: setsockopt (SO_BROADCAST) failed, tried to set to: %d"RESETCOLOR"\n", enableSocketFlag ); + } + if( 0 < port ) + { + if( 0 > bind( udpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) ) + { + ConsolePrintf( PRIO_ERROR, RED"Failed to bind UDP socket"RESETCOLOR"\n" ); + } + if( 0 > bind( tcpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) ) + { + ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to bind TCP socket"RESETCOLOR"\n" ); + } + } + if( isServer ) + { + pthread_t acceptThread; + pthread_create( &acceptThread, NULL, CMostIpc::TcpAcceptThread, ( void * )this ); + } + pthread_t udpRxThread, txThread; + pthread_create( &udpRxThread, NULL, CMostIpc::UdpWorkerRXThread, ( void * )this ); + pthread_create( &txThread, NULL, CMostIpc::TXThread, ( void * )this ); +} + +CMostIpc::~CMostIpc() +{ + allowThreadRun = false; + close( udpSock ); + close( tcpSock ); + sem_post(&semTx); + int32_t timeout = 100; + while( ( udpRxThreadIsRunning || txThreadIsRunning || acceptThreadIsRunning ) && ( --timeout > 0 ) ) + usleep( 10000 ); + pthread_mutex_lock( &critical_mutex_tcp ); + for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ ) + { + TcpClientInfo_t *var = m_TcpClientConnections[i]; + if( NULL != var ) + { + if( var->clientSock >= 0 ) + close( var->clientSock ); + var->clientSock = -1; + } + } + m_TcpClientConnections.RemoveAll(false); + pthread_mutex_unlock( &critical_mutex_tcp ); +} + +void CMostIpc::SendMsg( CMostMsgTx *Msg ) +{ + if( NULL != Msg ) + { + Msg->AddReference(); + m_MsgTxQueue.PushBack( Msg ); + sem_post(&semTx); + } +} + +void CMostIpc::RegisterMessageHandler( CMsgFilter *Filter ) +{ + m_MsgFilterQueue.PushBack( Filter ); +} + +void CMostIpc::UnregisterMessageHandler( CMsgFilter *Filter ) +{ + m_MsgFilterQueue.Remove(Filter); +} + +void *CMostIpc::TcpWorkerThread( void *pInst ) +{ + bool allowWorker = true; + TcpClientInfo_t *clientInfo = ( TcpClientInfo_t * )pInst; + if( NULL == clientInfo || NULL == clientInfo->ipc || 0 > clientInfo->clientSock ) + { + ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Invalid parameters"RESETCOLOR"\n" ); + return NULL; + } + uint8_t *buff = ( uint8_t * )malloc( TCP_BUFFER_SIZE ); + if( NULL == buff ) + { + ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Failed to allocate buffer"RESETCOLOR"\n" ); + return NULL; + } + + ConsolePrintf( PRIO_MEDIUM, "TcpWorkerThread: Start for IP:%s, Port:%d\n", clientInfo->clientIP, clientInfo->clientPort ); + + CMsgAddr *adr = new CMsgAddr( clientInfo->clientIP, clientInfo->clientPort, IpcTcp_V2_0 ); + CMostMsg *msg = NULL; + + while( clientInfo->ipc->allowThreadRun && allowWorker ) + { + int nRead; + if( 0 >= ( nRead = read( clientInfo->clientSock, buff, TCP_BUFFER_SIZE ) ) ) + { + if( clientInfo->ipc->allowThreadRun ) + { + if( 0 == nRead ) + ConsolePrintf( PRIO_MEDIUM, GREEN"TcpWorkerThread: TCP Client disconnected (IP:%s)"RESETCOLOR"\n", + clientInfo->clientIP ); + else + ConsolePrintf( PRIO_ERROR, YELLOW"TcpWorkerThread: Read error (return=%d), aborting .."RESETCOLOR"\n", nRead ); + } + allowWorker = false; + break; + } + for( int i = 0; i < nRead; i++ ) + { + if( NULL == msg ) + msg = new CMostMsg(); + if( !msg->Parse( buff[i] ) ) + { + ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Parse error, aborting.."RESETCOLOR"\n" ); + allowWorker = false; + break; + } + if( msg->IsValid() ) + { + clientInfo->ipc->OnMsgReceived( adr, msg ); + msg->RemoveReference(); + msg = NULL; + } + } + } + ConsolePrintf( PRIO_LOW, "TcpWorkerThread: End\n" ); + if( clientInfo->clientSock >= 0 ) + close( clientInfo->clientSock ); + + pthread_mutex_t *pMutex = &clientInfo->ipc->critical_mutex_tcp; + pthread_mutex_lock( pMutex ); + clientInfo->ipc->m_TcpClientConnections.Remove(clientInfo); + free( clientInfo ); + pthread_mutex_unlock( pMutex ); + + adr->RemoveReference(); + free( buff ); + return NULL; +} + +void *CMostIpc::TcpAcceptThread( void *pInst ) +{ + CMostIpc *ipc = ( CMostIpc * )pInst; + if( NULL == ipc || 0 > ipc->tcpSock ) + { + ConsolePrintf( PRIO_ERROR, RED"TcpAcceptThread was called with invalid parameters"RESETCOLOR"\n" ); + return NULL; + } + + ConsolePrintf( PRIO_LOW, "TcpAcceptThread starts\n" ); + ipc->acceptThreadIsRunning = true; + while( ipc->allowThreadRun ) + { + listen( ipc->tcpSock, 5 ); + socklen_t size = sizeof( struct sockaddr_in ); + TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) ); + if( NULL == var ) + { + ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in TCP accept Thread, aborting.."RESETCOLOR"\n" ); + break; + } + struct sockaddr_in clientAddr; + var->clientSock = accept( ipc->tcpSock, ( struct sockaddr * )&clientAddr, &size ); + if( !ipc->allowThreadRun ) + break; + if( -1 == var->clientSock ) + { + ConsolePrintf( PRIO_ERROR, RED"Failed to accept connection"RESETCOLOR"\n" ); + free( var ); + continue; + } + var->ipc = ipc; + inet_ntop( AF_INET, ( struct sockaddr_in * )&clientAddr.sin_addr, var->clientIP, sizeof ( var->clientIP ) ); + var->clientPort = ntohs( clientAddr.sin_port ); + ConsolePrintf( PRIO_MEDIUM, GREEN"TCP Client connected: IP=%s, Port=%d"RESETCOLOR"\n", + var->clientIP, var->clientPort ); + pthread_mutex_lock( &var->ipc->critical_mutex_tcp ); + var->ipc->m_TcpClientConnections.PushBack( var ); + pthread_mutex_unlock( &var->ipc->critical_mutex_tcp ); + pthread_create( &var->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )var ); + } + ConsolePrintf( PRIO_LOW, "TcpAcceptThread ends\n" ); + close( ipc->tcpSock ); + ipc->acceptThreadIsRunning = false; + return NULL; +} + +void CMostIpc::OnMsgReceived( CMsgAddr *Addr, CMostMsg *msg ) +{ + if( NULL == Addr || NULL == msg ) + { + ConsolePrintf( + PRIO_ERROR, RED"CMostIpc::OnMsgReceived was called with invalid parameters"RESETCOLOR"\n" ); + return; + } + ConsolePrintf( PRIO_LOW, "Received MOST-IPC message IP=%s, Protocol:%s, "\ + "FBlock:0x%X, Func:0x%X, OP:0x%X, PL:%d Bytes\n", Addr->GetInetAddress(), ( Addr->GetProtocol() == + IpcUdp_V2_0 ? "UDP" : "TCP" ), msg->GetFBlock(), msg->GetFunc(), msg->GetOpType(), msg->GetPayloadLen() ); + + for( uint32_t i = 0; i < m_MsgFilterQueue.Size(); i++ ) + { + m_MsgFilterQueue[i]->Filter( Addr, msg ); + } +} + +bool CMostIpc::SendUdp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen ) +{ + struct sockaddr_in destination; + memset( &destination, 0, sizeof ( destination ) ); + destination.sin_family = AF_INET; + destination.sin_addr.s_addr = inet_addr( ipAddress ); + destination.sin_port = htons( port ); + ssize_t bytesSent = sendto( udpSock, buffer, bufferLen, + 0, ( struct sockaddr * )&destination, sizeof ( destination ) ); + return ( ( uint32_t )bytesSent == bufferLen ); +} + +bool CMostIpc::SendTcp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen ) +{ + bool success = false; + TcpClientInfo_t *tcp = NULL; + + //Find the correct client connection + pthread_mutex_lock( &critical_mutex_tcp ); + for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ ) + { + TcpClientInfo_t *t = m_TcpClientConnections[i]; + if( NULL != t && t->clientPort == port && 0 == strcmp( t->clientIP, ipAddress ) ) + { + tcp = t; + break; + } + } + //There is no connection matching to the given address and port, try to connect if we are not master + if( !isServer && NULL == tcp ) + { + TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) ); + if( NULL != var ) + { + var->ipc = this; + var->clientPort = port; + var->clientSock = tcpSock; + strncpy( var->clientIP, ipAddress, sizeof( var->clientIP ) ); + + struct hostent *hostp; + hostp = gethostbyname( var->clientIP ); + if( NULL != hostp ) + { + struct sockaddr_in addr; + memset( &addr, 0x00, sizeof( struct sockaddr_in ) ); + addr.sin_family = AF_INET; + addr.sin_port = htons( var->clientPort ); + memcpy( &addr.sin_addr, hostp->h_addr, sizeof( addr.sin_addr ) ); + if( 0 == connect( tcpSock, ( struct sockaddr * )&addr, sizeof( addr ) ) ) + { + tcp = var; + m_TcpClientConnections.PushBack( tcp ); + pthread_create( &tcp->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )tcp ); + ConsolePrintf( PRIO_MEDIUM, GREEN"Connect to TCP host succeeded: IP=%s Port=%d"RESETCOLOR"\n", tcp->clientIP, + tcp->clientPort ); + + } + else + { + ConsolePrintf( PRIO_ERROR, RED"Could not connect to host: '%s'"RESETCOLOR"\n", var->clientIP ); + free( var ); + } + } + else + { + ConsolePrintf( PRIO_ERROR, RED"Could not find host: '%s'"RESETCOLOR"\n", var->clientIP ); + free( var ); + } + } + else + { + ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in SendTcp method"RESETCOLOR"\n" ); + } + } + + if( NULL != tcp ) + { + int result; + uint32_t written = 0; + success = true; + while( written < bufferLen ) + { + result = write( tcp->clientSock, &buffer[written], bufferLen - written ); + if( result < 0 ) + { + ConsolePrintf( PRIO_ERROR, RED"SendTcp: Write error"RESETCOLOR"\n" ); + success = false; + break; + } + else if( result == 0 ) + { + //Socket is non blocking, sleep to avoid 100% CPU load + usleep( 1000 ); + } + else + { + written += result; + } + } + pthread_mutex_unlock( &critical_mutex_tcp ); + } + return success; +} + +void *CMostIpc::UdpWorkerRXThread( void *pInst ) +{ + CMostIpc *ipc = ( CMostIpc * )pInst; + if( NULL == ipc || 0 > ipc->udpSock ) + { + ConsolePrintf( PRIO_ERROR, RED"UdpWorkerRXThread was called with invalid parameters"RESETCOLOR"\n" ); + return NULL; + } + ipc->udpRxThreadIsRunning = true; + while( ipc->allowThreadRun ) + { + //Receive + if( ipc->allowThreadRun ) + { + uint8_t tempBuffer[8*1024]; + char remoteIP[INET6_ADDRSTRLEN]; + uint32_t port; + struct sockaddr_in fromAddr; + socklen_t fromAddrLen = sizeof( fromAddr ); + ssize_t receivedLength; + receivedLength = recvfrom + ( ipc->udpSock, tempBuffer, sizeof( tempBuffer ), + 0, ( struct sockaddr * )&fromAddr, &fromAddrLen ); + if( receivedLength > 0 ) + { + inet_ntop( AF_INET, ( struct sockaddr_in * )&fromAddr.sin_addr, remoteIP, sizeof( remoteIP ) ); + port = ntohs( fromAddr.sin_port ); + CMsgAddr *adr = new CMsgAddr( remoteIP, port ); + CMostMsg *msg = new CMostMsg(); + for( int32_t i = 0; i < receivedLength; i++ ) + { + if( !msg->Parse( tempBuffer[i] ) ) + { + ConsolePrintf( PRIO_ERROR, RED"CMostIpc UDP received unparsable message"RESETCOLOR"\n" ); + break; + } + } + if( msg->IsValid() ) + ipc->OnMsgReceived( adr, msg ); + adr->RemoveReference(); + msg->RemoveReference(); + } + } + } + ipc->udpRxThreadIsRunning = false; + return NULL; +} + +void *CMostIpc::TXThread( void *pInst ) +{ + CMostIpc *ipc = ( CMostIpc * )pInst; + if( NULL == ipc || 0 > ipc->udpSock ) + { + ConsolePrintf( PRIO_ERROR, RED"TXThread was called with invalid parameters"RESETCOLOR"\n" ); + return NULL; + } + ipc->txThreadIsRunning = true; + + while( ipc->allowThreadRun ) + { + sem_wait(&ipc->semTx); + if (!ipc->allowThreadRun) + break; + CMostMsgTx *MsgTx = ipc->m_MsgTxQueue.PopFront(); + assert (NULL != MsgTx); + if( NULL != MsgTx ) + { + uint8_t *tempBuffer = NULL; + uint32_t bytesUsed = MsgTx->ToByteArray( &tempBuffer ); + if( NULL == tempBuffer || 0 == bytesUsed ) + { + ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Failed to serialize message"RESETCOLOR"\n" ); + if (NULL != tempBuffer) + free(tempBuffer); + MsgTx->MsgSent( false ); + continue; + } + CMsgAddr *addr = MsgTx->GetAddr(); + if( 0 != bytesUsed && NULL != addr ) + { + char *ipAddress = addr->GetInetAddress(); + uint32_t port = addr->GetPort(); + if( NULL != ipAddress ) + { + bool success = false; + switch( addr->GetProtocol() ) + { + case IpcUdp_V2_0: + success = ipc->SendUdp( ipAddress, port, tempBuffer, bytesUsed ); + break; + case IpcTcp_V2_0: + success = ipc->SendTcp( ipAddress, port, tempBuffer, bytesUsed ); + break; + default: + ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Unknown Protocol"RESETCOLOR"\n" ); + break; + } + MsgTx->MsgSent( success ); + } + else + { + ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Can not resolve IP address"RESETCOLOR"\n" ); + MsgTx->MsgSent( false ); + } + } + else + { + ConsolePrintf( PRIO_ERROR, RED"Error while sending in MOST IPC."RESETCOLOR"\n" ); + MsgTx->MsgSent( false ); + } + if (NULL != tempBuffer) + free( tempBuffer ); + MsgTx->RemoveReference(); + } + } + ipc->txThreadIsRunning = false; + return NULL; +} + |