summaryrefslogtreecommitdiffstats
path: root/Src/IP/MostIpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Src/IP/MostIpc.cpp')
-rw-r--r--Src/IP/MostIpc.cpp510
1 files changed, 510 insertions, 0 deletions
diff --git a/Src/IP/MostIpc.cpp b/Src/IP/MostIpc.cpp
new file mode 100644
index 0000000..86e7341
--- /dev/null
+++ b/Src/IP/MostIpc.cpp
@@ -0,0 +1,510 @@
+/*
+ * Video On Demand Samples
+ *
+ * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You may also obtain this software under a propriety license from Microchip.
+ * Please contact Microchip for further information.
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+#include "Console.h"
+#include "MostIpc.h"
+
+
+#define TCP_BUFFER_SIZE (1024*1024)
+
+CMostIpc::CMostIpc( int port, bool server ) : isServer( server ), allowThreadRun( true ),
+ udpRxThreadIsRunning( false ), txThreadIsRunning (false), acceptThreadIsRunning( false )
+{
+ if( isServer && 0 >= port )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Server port must be greater than 0!"RESETCOLOR"\n" );
+ return;
+ }
+ pthread_mutex_init( &critical_mutex_tcp, NULL );
+ sem_init( &semTx, 0, 0 ); //Mutex, initialized to 0 => sem_wait will block instantly
+
+ struct sockaddr_in listenAddress;
+ /* Receive address configuration */
+ memset( &listenAddress, 0, sizeof( listenAddress ) );
+ listenAddress.sin_family = AF_INET;
+ listenAddress.sin_addr.s_addr = htonl( INADDR_ANY );
+ listenAddress.sin_port = htons( port );
+
+ if( 0 > ( udpSock = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP ) ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create UDP socket"RESETCOLOR"\n" );
+ return;
+ }
+ if( 0 > ( tcpSock = socket( PF_INET, SOCK_STREAM, 0 ) ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to create TCP socket"RESETCOLOR"\n" );
+ return;
+ }
+
+ //Avoid hanging socket after killing application, only works with Linux kernel > 3.9
+#if defined (SO_REUSEADDR) && defined (SO_REUSEPORT)
+ int one = 1;
+ setsockopt(udpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
+ setsockopt(tcpSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &one, sizeof(one));
+#endif
+
+ // this call is what allows broadcast UDP packets to be sent:
+ int enableSocketFlag = 1;
+ if( 0 > setsockopt( udpSock, SOL_SOCKET, SO_BROADCAST, &enableSocketFlag, sizeof( enableSocketFlag ) ) )
+ {
+ ConsolePrintf(
+ PRIO_ERROR, RED"CMostIpc: setsockopt (SO_BROADCAST) failed, tried to set to: %d"RESETCOLOR"\n", enableSocketFlag );
+ }
+ if( 0 < port )
+ {
+ if( 0 > bind( udpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Failed to bind UDP socket"RESETCOLOR"\n" );
+ }
+ if( 0 > bind( tcpSock, ( struct sockaddr * )&listenAddress, sizeof listenAddress ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"CMostIpc: Failed to bind TCP socket"RESETCOLOR"\n" );
+ }
+ }
+ if( isServer )
+ {
+ pthread_t acceptThread;
+ pthread_create( &acceptThread, NULL, CMostIpc::TcpAcceptThread, ( void * )this );
+ }
+ pthread_t udpRxThread, txThread;
+ pthread_create( &udpRxThread, NULL, CMostIpc::UdpWorkerRXThread, ( void * )this );
+ pthread_create( &txThread, NULL, CMostIpc::TXThread, ( void * )this );
+}
+
+CMostIpc::~CMostIpc()
+{
+ allowThreadRun = false;
+ close( udpSock );
+ close( tcpSock );
+ sem_post(&semTx);
+ int32_t timeout = 100;
+ while( ( udpRxThreadIsRunning || txThreadIsRunning || acceptThreadIsRunning ) && ( --timeout > 0 ) )
+ usleep( 10000 );
+ pthread_mutex_lock( &critical_mutex_tcp );
+ for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
+ {
+ TcpClientInfo_t *var = m_TcpClientConnections[i];
+ if( NULL != var )
+ {
+ if( var->clientSock >= 0 )
+ close( var->clientSock );
+ var->clientSock = -1;
+ }
+ }
+ m_TcpClientConnections.RemoveAll(false);
+ pthread_mutex_unlock( &critical_mutex_tcp );
+}
+
+void CMostIpc::SendMsg( CMostMsgTx *Msg )
+{
+ if( NULL != Msg )
+ {
+ Msg->AddReference();
+ m_MsgTxQueue.PushBack( Msg );
+ sem_post(&semTx);
+ }
+}
+
+void CMostIpc::RegisterMessageHandler( CMsgFilter *Filter )
+{
+ m_MsgFilterQueue.PushBack( Filter );
+}
+
+void CMostIpc::UnregisterMessageHandler( CMsgFilter *Filter )
+{
+ m_MsgFilterQueue.Remove(Filter);
+}
+
+void *CMostIpc::TcpWorkerThread( void *pInst )
+{
+ bool allowWorker = true;
+ TcpClientInfo_t *clientInfo = ( TcpClientInfo_t * )pInst;
+ if( NULL == clientInfo || NULL == clientInfo->ipc || 0 > clientInfo->clientSock )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Invalid parameters"RESETCOLOR"\n" );
+ return NULL;
+ }
+ uint8_t *buff = ( uint8_t * )malloc( TCP_BUFFER_SIZE );
+ if( NULL == buff )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Failed to allocate buffer"RESETCOLOR"\n" );
+ return NULL;
+ }
+
+ ConsolePrintf( PRIO_MEDIUM, "TcpWorkerThread: Start for IP:%s, Port:%d\n", clientInfo->clientIP, clientInfo->clientPort );
+
+ CMsgAddr *adr = new CMsgAddr( clientInfo->clientIP, clientInfo->clientPort, IpcTcp_V2_0 );
+ CMostMsg *msg = NULL;
+
+ while( clientInfo->ipc->allowThreadRun && allowWorker )
+ {
+ int nRead;
+ if( 0 >= ( nRead = read( clientInfo->clientSock, buff, TCP_BUFFER_SIZE ) ) )
+ {
+ if( clientInfo->ipc->allowThreadRun )
+ {
+ if( 0 == nRead )
+ ConsolePrintf( PRIO_MEDIUM, GREEN"TcpWorkerThread: TCP Client disconnected (IP:%s)"RESETCOLOR"\n",
+ clientInfo->clientIP );
+ else
+ ConsolePrintf( PRIO_ERROR, YELLOW"TcpWorkerThread: Read error (return=%d), aborting .."RESETCOLOR"\n", nRead );
+ }
+ allowWorker = false;
+ break;
+ }
+ for( int i = 0; i < nRead; i++ )
+ {
+ if( NULL == msg )
+ msg = new CMostMsg();
+ if( !msg->Parse( buff[i] ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"TcpWorkerThread: Parse error, aborting.."RESETCOLOR"\n" );
+ allowWorker = false;
+ break;
+ }
+ if( msg->IsValid() )
+ {
+ clientInfo->ipc->OnMsgReceived( adr, msg );
+ msg->RemoveReference();
+ msg = NULL;
+ }
+ }
+ }
+ ConsolePrintf( PRIO_LOW, "TcpWorkerThread: End\n" );
+ if( clientInfo->clientSock >= 0 )
+ close( clientInfo->clientSock );
+
+ pthread_mutex_t *pMutex = &clientInfo->ipc->critical_mutex_tcp;
+ pthread_mutex_lock( pMutex );
+ clientInfo->ipc->m_TcpClientConnections.Remove(clientInfo);
+ free( clientInfo );
+ pthread_mutex_unlock( pMutex );
+
+ adr->RemoveReference();
+ free( buff );
+ return NULL;
+}
+
+void *CMostIpc::TcpAcceptThread( void *pInst )
+{
+ CMostIpc *ipc = ( CMostIpc * )pInst;
+ if( NULL == ipc || 0 > ipc->tcpSock )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"TcpAcceptThread was called with invalid parameters"RESETCOLOR"\n" );
+ return NULL;
+ }
+
+ ConsolePrintf( PRIO_LOW, "TcpAcceptThread starts\n" );
+ ipc->acceptThreadIsRunning = true;
+ while( ipc->allowThreadRun )
+ {
+ listen( ipc->tcpSock, 5 );
+ socklen_t size = sizeof( struct sockaddr_in );
+ TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
+ if( NULL == var )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in TCP accept Thread, aborting.."RESETCOLOR"\n" );
+ break;
+ }
+ struct sockaddr_in clientAddr;
+ var->clientSock = accept( ipc->tcpSock, ( struct sockaddr * )&clientAddr, &size );
+ if( !ipc->allowThreadRun )
+ break;
+ if( -1 == var->clientSock )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Failed to accept connection"RESETCOLOR"\n" );
+ free( var );
+ continue;
+ }
+ var->ipc = ipc;
+ inet_ntop( AF_INET, ( struct sockaddr_in * )&clientAddr.sin_addr, var->clientIP, sizeof ( var->clientIP ) );
+ var->clientPort = ntohs( clientAddr.sin_port );
+ ConsolePrintf( PRIO_MEDIUM, GREEN"TCP Client connected: IP=%s, Port=%d"RESETCOLOR"\n",
+ var->clientIP, var->clientPort );
+ pthread_mutex_lock( &var->ipc->critical_mutex_tcp );
+ var->ipc->m_TcpClientConnections.PushBack( var );
+ pthread_mutex_unlock( &var->ipc->critical_mutex_tcp );
+ pthread_create( &var->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )var );
+ }
+ ConsolePrintf( PRIO_LOW, "TcpAcceptThread ends\n" );
+ close( ipc->tcpSock );
+ ipc->acceptThreadIsRunning = false;
+ return NULL;
+}
+
+void CMostIpc::OnMsgReceived( CMsgAddr *Addr, CMostMsg *msg )
+{
+ if( NULL == Addr || NULL == msg )
+ {
+ ConsolePrintf(
+ PRIO_ERROR, RED"CMostIpc::OnMsgReceived was called with invalid parameters"RESETCOLOR"\n" );
+ return;
+ }
+ ConsolePrintf( PRIO_LOW, "Received MOST-IPC message IP=%s, Protocol:%s, "\
+ "FBlock:0x%X, Func:0x%X, OP:0x%X, PL:%d Bytes\n", Addr->GetInetAddress(), ( Addr->GetProtocol() ==
+ IpcUdp_V2_0 ? "UDP" : "TCP" ), msg->GetFBlock(), msg->GetFunc(), msg->GetOpType(), msg->GetPayloadLen() );
+
+ for( uint32_t i = 0; i < m_MsgFilterQueue.Size(); i++ )
+ {
+ m_MsgFilterQueue[i]->Filter( Addr, msg );
+ }
+}
+
+bool CMostIpc::SendUdp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
+{
+ struct sockaddr_in destination;
+ memset( &destination, 0, sizeof ( destination ) );
+ destination.sin_family = AF_INET;
+ destination.sin_addr.s_addr = inet_addr( ipAddress );
+ destination.sin_port = htons( port );
+ ssize_t bytesSent = sendto( udpSock, buffer, bufferLen,
+ 0, ( struct sockaddr * )&destination, sizeof ( destination ) );
+ return ( ( uint32_t )bytesSent == bufferLen );
+}
+
+bool CMostIpc::SendTcp( const char *ipAddress, uint32_t port, const uint8_t *buffer, uint32_t bufferLen )
+{
+ bool success = false;
+ TcpClientInfo_t *tcp = NULL;
+
+ //Find the correct client connection
+ pthread_mutex_lock( &critical_mutex_tcp );
+ for( uint32_t i = 0; i < m_TcpClientConnections.Size(); i++ )
+ {
+ TcpClientInfo_t *t = m_TcpClientConnections[i];
+ if( NULL != t && t->clientPort == port && 0 == strcmp( t->clientIP, ipAddress ) )
+ {
+ tcp = t;
+ break;
+ }
+ }
+ //There is no connection matching to the given address and port, try to connect if we are not master
+ if( !isServer && NULL == tcp )
+ {
+ TcpClientInfo_t *var = ( TcpClientInfo_t * )calloc( 1, sizeof( TcpClientInfo_t ) );
+ if( NULL != var )
+ {
+ var->ipc = this;
+ var->clientPort = port;
+ var->clientSock = tcpSock;
+ strncpy( var->clientIP, ipAddress, sizeof( var->clientIP ) );
+
+ struct hostent *hostp;
+ hostp = gethostbyname( var->clientIP );
+ if( NULL != hostp )
+ {
+ struct sockaddr_in addr;
+ memset( &addr, 0x00, sizeof( struct sockaddr_in ) );
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons( var->clientPort );
+ memcpy( &addr.sin_addr, hostp->h_addr, sizeof( addr.sin_addr ) );
+ if( 0 == connect( tcpSock, ( struct sockaddr * )&addr, sizeof( addr ) ) )
+ {
+ tcp = var;
+ m_TcpClientConnections.PushBack( tcp );
+ pthread_create( &tcp->workerThread, NULL, CMostIpc::TcpWorkerThread, ( void * )tcp );
+ ConsolePrintf( PRIO_MEDIUM, GREEN"Connect to TCP host succeeded: IP=%s Port=%d"RESETCOLOR"\n", tcp->clientIP,
+ tcp->clientPort );
+
+ }
+ else
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Could not connect to host: '%s'"RESETCOLOR"\n", var->clientIP );
+ free( var );
+ }
+ }
+ else
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Could not find host: '%s'"RESETCOLOR"\n", var->clientIP );
+ free( var );
+ }
+ }
+ else
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Failed to allocate memory in SendTcp method"RESETCOLOR"\n" );
+ }
+ }
+
+ if( NULL != tcp )
+ {
+ int result;
+ uint32_t written = 0;
+ success = true;
+ while( written < bufferLen )
+ {
+ result = write( tcp->clientSock, &buffer[written], bufferLen - written );
+ if( result < 0 )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"SendTcp: Write error"RESETCOLOR"\n" );
+ success = false;
+ break;
+ }
+ else if( result == 0 )
+ {
+ //Socket is non blocking, sleep to avoid 100% CPU load
+ usleep( 1000 );
+ }
+ else
+ {
+ written += result;
+ }
+ }
+ pthread_mutex_unlock( &critical_mutex_tcp );
+ }
+ return success;
+}
+
+void *CMostIpc::UdpWorkerRXThread( void *pInst )
+{
+ CMostIpc *ipc = ( CMostIpc * )pInst;
+ if( NULL == ipc || 0 > ipc->udpSock )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"UdpWorkerRXThread was called with invalid parameters"RESETCOLOR"\n" );
+ return NULL;
+ }
+ ipc->udpRxThreadIsRunning = true;
+ while( ipc->allowThreadRun )
+ {
+ //Receive
+ if( ipc->allowThreadRun )
+ {
+ uint8_t tempBuffer[8*1024];
+ char remoteIP[INET6_ADDRSTRLEN];
+ uint32_t port;
+ struct sockaddr_in fromAddr;
+ socklen_t fromAddrLen = sizeof( fromAddr );
+ ssize_t receivedLength;
+ receivedLength = recvfrom
+ ( ipc->udpSock, tempBuffer, sizeof( tempBuffer ),
+ 0, ( struct sockaddr * )&fromAddr, &fromAddrLen );
+ if( receivedLength > 0 )
+ {
+ inet_ntop( AF_INET, ( struct sockaddr_in * )&fromAddr.sin_addr, remoteIP, sizeof( remoteIP ) );
+ port = ntohs( fromAddr.sin_port );
+ CMsgAddr *adr = new CMsgAddr( remoteIP, port );
+ CMostMsg *msg = new CMostMsg();
+ for( int32_t i = 0; i < receivedLength; i++ )
+ {
+ if( !msg->Parse( tempBuffer[i] ) )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"CMostIpc UDP received unparsable message"RESETCOLOR"\n" );
+ break;
+ }
+ }
+ if( msg->IsValid() )
+ ipc->OnMsgReceived( adr, msg );
+ adr->RemoveReference();
+ msg->RemoveReference();
+ }
+ }
+ }
+ ipc->udpRxThreadIsRunning = false;
+ return NULL;
+}
+
+void *CMostIpc::TXThread( void *pInst )
+{
+ CMostIpc *ipc = ( CMostIpc * )pInst;
+ if( NULL == ipc || 0 > ipc->udpSock )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"TXThread was called with invalid parameters"RESETCOLOR"\n" );
+ return NULL;
+ }
+ ipc->txThreadIsRunning = true;
+
+ while( ipc->allowThreadRun )
+ {
+ sem_wait(&ipc->semTx);
+ if (!ipc->allowThreadRun)
+ break;
+ CMostMsgTx *MsgTx = ipc->m_MsgTxQueue.PopFront();
+ assert (NULL != MsgTx);
+ if( NULL != MsgTx )
+ {
+ uint8_t *tempBuffer = NULL;
+ uint32_t bytesUsed = MsgTx->ToByteArray( &tempBuffer );
+ if( NULL == tempBuffer || 0 == bytesUsed )
+ {
+ ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Failed to serialize message"RESETCOLOR"\n" );
+ if (NULL != tempBuffer)
+ free(tempBuffer);
+ MsgTx->MsgSent( false );
+ continue;
+ }
+ CMsgAddr *addr = MsgTx->GetAddr();
+ if( 0 != bytesUsed && NULL != addr )
+ {
+ char *ipAddress = addr->GetInetAddress();
+ uint32_t port = addr->GetPort();
+ if( NULL != ipAddress )
+ {
+ bool success = false;
+ switch( addr->GetProtocol() )
+ {
+ case IpcUdp_V2_0:
+ success = ipc->SendUdp( ipAddress, port, tempBuffer, bytesUsed );
+ break;
+ case IpcTcp_V2_0:
+ success = ipc->SendTcp( ipAddress, port, tempBuffer, bytesUsed );
+ break;
+ default:
+ ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Unknown Protocol"RESETCOLOR"\n" );
+ break;
+ }
+ MsgTx->MsgSent( success );
+ }
+ else
+ {
+ ConsolePrintf( PRIO_ERROR, RED"MOST-IP: Can not resolve IP address"RESETCOLOR"\n" );
+ MsgTx->MsgSent( false );
+ }
+ }
+ else
+ {
+ ConsolePrintf( PRIO_ERROR, RED"Error while sending in MOST IPC."RESETCOLOR"\n" );
+ MsgTx->MsgSent( false );
+ }
+ if (NULL != tempBuffer)
+ free( tempBuffer );
+ MsgTx->RemoveReference();
+ }
+ }
+ ipc->txThreadIsRunning = false;
+ return NULL;
+}
+