diff options
Diffstat (limited to 'Src/Multiplexer/Source.h')
-rw-r--r-- | Src/Multiplexer/Source.h | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/Src/Multiplexer/Source.h b/Src/Multiplexer/Source.h new file mode 100644 index 0000000..9747e39 --- /dev/null +++ b/Src/Multiplexer/Source.h @@ -0,0 +1,408 @@ +/* + * Video On Demand Samples + * + * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You may also obtain this software under a propriety license from Microchip. + * Please contact Microchip for further information. + * + */ + +/*----------------------------------------------------------*/ +/*! \file + * \brief This file contains the CSource class. + */ +/*----------------------------------------------------------*/ +#ifndef _SOURCE_H_ +#define _SOURCE_H_ + +#include <stdint.h> +#include <stddef.h> +#include <sys/time.h> +#include <time.h> +#include <pthread.h> +#include "TsPacket.h" +#include "Stream.h" +#include "SafeVector.h" +#include "AutoLock.h" + + + +/*----------------------------------------------------------*/ +/*! \brief Base class sourcing a transport stream + */ +/*----------------------------------------------------------*/ +class CSource : public CTsPacket, public CRingBuffer +{ +protected: + static uint32_t m_nInstCnt; + uint32_t m_nInst; + bool m_bAutoDestroy; + bool m_bEmptyRing; + uint32_t m_nAPid; + uint32_t m_nVPid; + uint32_t m_nPPid; + uint32_t m_nLastPcr; + uint16_t m_fReqPos; + bool m_bPause; + bool m_bRepetition; + uint64_t m_nBytesRead; + uint64_t m_nErrBuf; + uint64_t m_nErrPcr; + uint64_t m_nErrTs; + uint32_t m_tTsStart; + uint32_t m_tTsLast; + uint32_t m_tTimerStart; + uint64_t m_tLastPmt; + + + CSafeVector<CStream *>m_Stream; + pthread_mutex_t m_StreamMutex; + +public: + static const uint16_t INVALID_POS = 0xFFFF; + static const uint32_t READ_LEN = (47*512); // this number MUST be dividable by 188 and 512 + static const uint32_t NUM_READ_AHEAD = 8; // number of blocks reading ahead + + + + /*----------------------------------------------------------*/ + /*! \brief Constructs Source instance + * \param bAutoDestroy - true, destroys it self, when the + * last listener gone + */ + /*----------------------------------------------------------*/ + CSource(bool bAutoDestroy = true) + : CRingBuffer(TS_PACKET_LEN, READ_LEN, NUM_READ_AHEAD) + , m_nInst(m_nInstCnt++) + , m_bAutoDestroy(bAutoDestroy) + , m_bEmptyRing(false) + , m_nAPid(PID_INVALID) + , m_nVPid(PID_INVALID) + , m_nPPid(PID_INVALID) + , m_nLastPcr(PCR_INVALID) + , m_fReqPos(INVALID_POS) + , m_bPause(false) + , m_bRepetition(false) + , m_nBytesRead(0) + , m_nErrBuf(0) + , m_nErrPcr(0) + , m_nErrTs(0) + , m_tTsStart(0) + , m_tTsLast(0) + , m_tTimerStart(0) + , m_tLastPmt(0) + { + pthread_mutex_init( &m_StreamMutex, NULL ); + } + + + virtual ~CSource() + { + pthread_mutex_destroy( &m_StreamMutex ); + } + + + /*----------------------------------------------------------*/ + /*! \brief Add a listener to a source + * + * \param pRingBuffer - RingBuffer to source to + */ + /*----------------------------------------------------------*/ + void AddStream(CStream *pStream) + { + volatile CAutoLock autoLock( &m_StreamMutex ); + if ( pStream ) m_Stream.PushBack(pStream); + } + + + + + /*----------------------------------------------------------*/ + /*! \brief Remove a stream receiving source data + */ + /*----------------------------------------------------------*/ + void RemoveStream(CStream *pStream) + { + pthread_mutex_lock( &m_StreamMutex ); + m_Stream.Remove(pStream); + if (m_bAutoDestroy && (0 == m_Stream.Size())) + { + pthread_mutex_unlock( &m_StreamMutex ); + delete this; + return; + } + pthread_mutex_unlock( &m_StreamMutex ); + } + + + + /*----------------------------------------------------------*/ + /*! \brief called periodically to fill ring buffers of all + * m_Stream + */ + /*----------------------------------------------------------*/ + virtual bool FillBuffer() = 0; + + + + /*----------------------------------------------------------*/ + /*! \brief sends a TS packet from the ring buffer to all + * streams + */ + /*----------------------------------------------------------*/ + bool SendTsPacket(CStream *pStream) + { + volatile CAutoLock autoLock( &m_StreamMutex ); + if ( (0 == m_Stream.Size()) || // no sources available? + (pStream != m_Stream[0]) ) // don't call a source multiple times if it is sourcing multiple streams + { + return false; + } + + if ( m_bEmptyRing ) // after a seek, the stored data can be flushed + { + while ( GetCanRead() ) // as long as there is data + ReadDone(); // skip TS packet + + m_bEmptyRing = false; // ring is now empty, don't empry again + return false; + } + + if ( !GetCanRead() ) + { + m_nErrBuf++; // read thread too slow + return false; + } + + uint8_t *pTs = GetReadPos(); // get pointer to current TS packet + + if ( !GetHasSyncByte(pTs) ) // check packet + { + m_nErrTs++; + ReadDone(); + return false; + } + + struct timeval tv; + gettimeofday(&tv, 0); + uint64_t tTimerCur = tv.tv_sec * 45000 + (tv.tv_usec * 45) / 1000; + + if ( (tTimerCur - m_tLastPmt) >= (30 * 45) ) // time to send PMT packet (approx 30ms)? + { + for (uint32_t i = 0; i < m_Stream.Size(); i++) + m_Stream[i]->SendPmt(); + + m_tLastPmt = tTimerCur; + return true; + } + + // ------------------------------------------------ // + // check PCR // + // ------------------------------------------------ // + if ((pTs[3] & 0x20) && // check existence of an adaption field + (pTs[4]) && // adaption field has length > 0 + (pTs[5] & 0x10) ) // adaption field contains PCR? + { + unsigned long tTsCur = // PCR runs from 0 to 45000 in a second + (pTs[6] << 24) + + (pTs[7] << 16) + + (pTs[8] << 8) + + (pTs[9]); + + if ((m_tTsStart > tTsCur) || // not the first call, or jumped to start of file + (m_tTsLast > tTsCur ) || + (tTsCur - m_tTsLast > 45 * 1000)) { // discontinuity? + if (0 != m_tTimerStart) // stream not just started? + m_nErrPcr++; // only for statistics + + m_tTimerStart = tTimerCur; // reset time measurement + m_tTsStart = tTsCur; + } + m_tTsLast = tTsCur; // remember last PCR + + int dT = (int)(tTsCur - m_tTsStart) - (int)(tTimerCur - m_tTimerStart); + + if (0 < dT) { + if ((45 * 1000 / 2) > dT) { // if the recreation of the TS timing is less 500ms + return false; // wait before outputting this packet + } + + m_tTimerStart = tTimerCur; // reset time measurement + m_tTsStart = tTsCur; + m_nErrPcr++; // only for statistics + } else if ((-45 * 1000 / 2) > dT) { // if the recreation of the TS timing is off for 500ms + m_tTimerStart = tTimerCur; // reset time measurement + m_tTsStart = tTsCur; + m_tTsStart = tTsCur; + m_nErrPcr++; // only for statistics + } + } + + uint32_t nCurPid = GetPid(pTs); + + if ( nCurPid == m_nAPid ) + { + for (uint32_t i = 0; i < m_Stream.Size(); i++) + m_Stream[i]->SendAudio(pTs); + + ReadDone(); // skip and continue with next TS packet + return true; + } + else if ( nCurPid == m_nVPid ) + { + for (uint32_t i = 0; i < m_Stream.Size(); i++) + m_Stream[i]->SendVideo(pTs); + + ReadDone(); // skip and continue with next TS packet + return true; + } + + ReadDone(); // skip and continue with next TS packet + return false; + } + + + + /*----------------------------------------------------------*/ + /*! \brief returns source's audio PID + */ + /*----------------------------------------------------------*/ + uint32_t GetAPid() { return m_nAPid; } + + + + /*----------------------------------------------------------*/ + /*! \brief returns source's audio PID + */ + /*----------------------------------------------------------*/ + uint32_t GetVPid() { return m_nVPid; } + + + + /*----------------------------------------------------------*/ + /*! \brief number of buffer underflows since last call + */ + /*----------------------------------------------------------*/ + uint64_t GetErrBuf() + { + uint64_t nTmp = m_nErrBuf; + m_nErrBuf = 0; + return nTmp; + }; + + + + /*----------------------------------------------------------*/ + /*! \brief number of PCR errors since last call + */ + /*----------------------------------------------------------*/ + uint64_t GetErrPcr() + { + uint64_t nTmp = m_nErrPcr; + m_nErrPcr = 0; + return nTmp; + }; + + + + /*----------------------------------------------------------*/ + /*! \brief get number of TS syntax violations since last call + */ + /*----------------------------------------------------------*/ + uint64_t GetErrTs() + { + uint64_t nTmp = m_nErrTs; + m_nErrTs = 0; + return nTmp; + }; + + + + /*----------------------------------------------------------*/ + /*! \brief returns source's last PCR. By dividing this value + * with CTsPacket::PCR_TICKS_PER_SEC the length of + * time can be detected. If no length is available, + * CTsPacket::PCR_INVALID is returned + * + */ + /*----------------------------------------------------------*/ + uint32_t GetLastPcr() { return m_nLastPcr; } + + + + /*----------------------------------------------------------*/ + /*! \brief returns number of bytes filled into listener's + * ring buffers + */ + /*----------------------------------------------------------*/ + uint64_t GetBytesRead() + { + uint64_t oldVal = m_nBytesRead; + m_nBytesRead = 0; + return oldVal; + } + + + + /*----------------------------------------------------------*/ + /*! \brief returns if stream is paused + */ + /*----------------------------------------------------------*/ + bool GetPause() { return m_bPause; } + + + + + /*----------------------------------------------------------*/ + /*! \brief unique ID for each stream + */ + /*----------------------------------------------------------*/ + uint32_t GetInstId() { return m_nInst; }; + + + + + /*----------------------------------------------------------*/ + /*! \brief sets a new position in the stream + */ + /*----------------------------------------------------------*/ + void SetPos(uint16_t fReqPos) { + m_fReqPos = fReqPos; + m_bEmptyRing = true; + } + + + + /*----------------------------------------------------------*/ + /*! \brief set repeat mode + * \param bRepetition - if true stream will repeated for ever + */ + /*----------------------------------------------------------*/ + void SetRepetition(bool bRepetition) { m_bRepetition = bRepetition; } + + + + /*----------------------------------------------------------*/ + /*! \brief sets a pause mode on/off + */ + /*----------------------------------------------------------*/ + void SetPause(bool bPause) { m_bPause = bPause; } +}; + + + +#endif //_SOURCE_H_ |