/* * Video On Demand Samples * * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * * You may also obtain this software under a propriety license from Microchip. * Please contact Microchip for further information. * */ /*----------------------------------------------------------*/ /*! \file * \brief This file contains the CSource class. */ /*----------------------------------------------------------*/ #ifndef _SOURCE_H_ #define _SOURCE_H_ #include #include #include #include #include #include "TsPacket.h" #include "Stream.h" #include "SafeVector.h" #include "AutoLock.h" /*----------------------------------------------------------*/ /*! \brief Base class sourcing a transport stream */ /*----------------------------------------------------------*/ class CSource : public CTsPacket, public CRingBuffer { protected: static uint32_t m_nInstCnt; uint32_t m_nInst; bool m_bAutoDestroy; bool m_bEmptyRing; uint32_t m_nAPid; uint32_t m_nVPid; uint32_t m_nPPid; uint32_t m_nLastPcr; uint16_t m_fReqPos; bool m_bPause; bool m_bRepetition; uint64_t m_nBytesRead; uint64_t m_nErrBuf; uint64_t m_nErrPcr; uint64_t m_nErrTs; uint32_t m_tTsStart; uint32_t m_tTsLast; uint32_t m_tTimerStart; uint64_t m_tLastPmt; CSafeVectorm_Stream; pthread_mutex_t m_StreamMutex; public: static const uint16_t INVALID_POS = 0xFFFF; static const uint32_t READ_LEN = (47*512); // this number MUST be dividable by 188 and 512 static const uint32_t NUM_READ_AHEAD = 8; // number of blocks reading ahead /*----------------------------------------------------------*/ /*! \brief Constructs Source instance * \param bAutoDestroy - true, destroys it self, when the * last listener gone */ /*----------------------------------------------------------*/ CSource(bool bAutoDestroy = true) : CRingBuffer(TS_PACKET_LEN, READ_LEN, NUM_READ_AHEAD) , m_nInst(m_nInstCnt++) , m_bAutoDestroy(bAutoDestroy) , m_bEmptyRing(false) , m_nAPid(PID_INVALID) , m_nVPid(PID_INVALID) , m_nPPid(PID_INVALID) , m_nLastPcr(PCR_INVALID) , m_fReqPos(INVALID_POS) , m_bPause(false) , m_bRepetition(false) , m_nBytesRead(0) , m_nErrBuf(0) , m_nErrPcr(0) , m_nErrTs(0) , m_tTsStart(0) , m_tTsLast(0) , m_tTimerStart(0) , m_tLastPmt(0) { pthread_mutex_init( &m_StreamMutex, NULL ); } virtual ~CSource() { pthread_mutex_destroy( &m_StreamMutex ); } /*----------------------------------------------------------*/ /*! \brief Add a listener to a source * * \param pRingBuffer - RingBuffer to source to */ /*----------------------------------------------------------*/ void AddStream(CStream *pStream) { volatile CAutoLock autoLock( &m_StreamMutex ); if ( pStream ) m_Stream.PushBack(pStream); } /*----------------------------------------------------------*/ /*! \brief Remove a stream receiving source data */ /*----------------------------------------------------------*/ void RemoveStream(CStream *pStream) { pthread_mutex_lock( &m_StreamMutex ); m_Stream.Remove(pStream); if (m_bAutoDestroy && (0 == m_Stream.Size())) { pthread_mutex_unlock( &m_StreamMutex ); delete this; return; } pthread_mutex_unlock( &m_StreamMutex ); } /*----------------------------------------------------------*/ /*! \brief called periodically to fill ring buffers of all * m_Stream */ /*----------------------------------------------------------*/ virtual bool FillBuffer() = 0; /*----------------------------------------------------------*/ /*! \brief sends a TS packet from the ring buffer to all * streams */ /*----------------------------------------------------------*/ bool SendTsPacket(CStream *pStream) { volatile CAutoLock autoLock( &m_StreamMutex ); if ( (0 == m_Stream.Size()) || // no sources available? (pStream != m_Stream[0]) ) // don't call a source multiple times if it is sourcing multiple streams { return false; } if ( m_bEmptyRing ) // after a seek, the stored data can be flushed { while ( GetCanRead() ) // as long as there is data ReadDone(); // skip TS packet m_bEmptyRing = false; // ring is now empty, don't empry again return false; } if ( !GetCanRead() ) { m_nErrBuf++; // read thread too slow return false; } uint8_t *pTs = GetReadPos(); // get pointer to current TS packet if ( !GetHasSyncByte(pTs) ) // check packet { m_nErrTs++; ReadDone(); return false; } struct timeval tv; gettimeofday(&tv, 0); uint64_t tTimerCur = tv.tv_sec * 45000 + (tv.tv_usec * 45) / 1000; if ( (tTimerCur - m_tLastPmt) >= (30 * 45) ) // time to send PMT packet (approx 30ms)? { for (uint32_t i = 0; i < m_Stream.Size(); i++) m_Stream[i]->SendPmt(); m_tLastPmt = tTimerCur; return true; } // ------------------------------------------------ // // check PCR // // ------------------------------------------------ // if ((pTs[3] & 0x20) && // check existence of an adaption field (pTs[4]) && // adaption field has length > 0 (pTs[5] & 0x10) ) // adaption field contains PCR? { unsigned long tTsCur = // PCR runs from 0 to 45000 in a second (pTs[6] << 24) + (pTs[7] << 16) + (pTs[8] << 8) + (pTs[9]); if ((m_tTsStart > tTsCur) || // not the first call, or jumped to start of file (m_tTsLast > tTsCur ) || (tTsCur - m_tTsLast > 45 * 1000)) { // discontinuity? if (0 != m_tTimerStart) // stream not just started? m_nErrPcr++; // only for statistics m_tTimerStart = tTimerCur; // reset time measurement m_tTsStart = tTsCur; } m_tTsLast = tTsCur; // remember last PCR int dT = (int)(tTsCur - m_tTsStart) - (int)(tTimerCur - m_tTimerStart); if (0 < dT) { if ((45 * 1000 / 2) > dT) { // if the recreation of the TS timing is less 500ms return false; // wait before outputting this packet } m_tTimerStart = tTimerCur; // reset time measurement m_tTsStart = tTsCur; m_nErrPcr++; // only for statistics } else if ((-45 * 1000 / 2) > dT) { // if the recreation of the TS timing is off for 500ms m_tTimerStart = tTimerCur; // reset time measurement m_tTsStart = tTsCur; m_tTsStart = tTsCur; m_nErrPcr++; // only for statistics } } uint32_t nCurPid = GetPid(pTs); if ( nCurPid == m_nAPid ) { for (uint32_t i = 0; i < m_Stream.Size(); i++) m_Stream[i]->SendAudio(pTs); ReadDone(); // skip and continue with next TS packet return true; } else if ( nCurPid == m_nVPid ) { for (uint32_t i = 0; i < m_Stream.Size(); i++) m_Stream[i]->SendVideo(pTs); ReadDone(); // skip and continue with next TS packet return true; } ReadDone(); // skip and continue with next TS packet return false; } /*----------------------------------------------------------*/ /*! \brief returns source's audio PID */ /*----------------------------------------------------------*/ uint32_t GetAPid() { return m_nAPid; } /*----------------------------------------------------------*/ /*! \brief returns source's audio PID */ /*----------------------------------------------------------*/ uint32_t GetVPid() { return m_nVPid; } /*----------------------------------------------------------*/ /*! \brief number of buffer underflows since last call */ /*----------------------------------------------------------*/ uint64_t GetErrBuf() { uint64_t nTmp = m_nErrBuf; m_nErrBuf = 0; return nTmp; }; /*----------------------------------------------------------*/ /*! \brief number of PCR errors since last call */ /*----------------------------------------------------------*/ uint64_t GetErrPcr() { uint64_t nTmp = m_nErrPcr; m_nErrPcr = 0; return nTmp; }; /*----------------------------------------------------------*/ /*! \brief get number of TS syntax violations since last call */ /*----------------------------------------------------------*/ uint64_t GetErrTs() { uint64_t nTmp = m_nErrTs; m_nErrTs = 0; return nTmp; }; /*----------------------------------------------------------*/ /*! \brief returns source's last PCR. By dividing this value * with CTsPacket::PCR_TICKS_PER_SEC the length of * time can be detected. If no length is available, * CTsPacket::PCR_INVALID is returned * */ /*----------------------------------------------------------*/ uint32_t GetLastPcr() { return m_nLastPcr; } /*----------------------------------------------------------*/ /*! \brief returns number of bytes filled into listener's * ring buffers */ /*----------------------------------------------------------*/ uint64_t GetBytesRead() { uint64_t oldVal = m_nBytesRead; m_nBytesRead = 0; return oldVal; } /*----------------------------------------------------------*/ /*! \brief returns if stream is paused */ /*----------------------------------------------------------*/ bool GetPause() { return m_bPause; } /*----------------------------------------------------------*/ /*! \brief unique ID for each stream */ /*----------------------------------------------------------*/ uint32_t GetInstId() { return m_nInst; }; /*----------------------------------------------------------*/ /*! \brief sets a new position in the stream */ /*----------------------------------------------------------*/ void SetPos(uint16_t fReqPos) { m_fReqPos = fReqPos; m_bEmptyRing = true; } /*----------------------------------------------------------*/ /*! \brief set repeat mode * \param bRepetition - if true stream will repeated for ever */ /*----------------------------------------------------------*/ void SetRepetition(bool bRepetition) { m_bRepetition = bRepetition; } /*----------------------------------------------------------*/ /*! \brief sets a pause mode on/off */ /*----------------------------------------------------------*/ void SetPause(bool bPause) { m_bPause = bPause; } }; #endif //_SOURCE_H_