summaryrefslogtreecommitdiffstats
path: root/Src/Multiplexer/Source.h
diff options
context:
space:
mode:
Diffstat (limited to 'Src/Multiplexer/Source.h')
-rw-r--r--Src/Multiplexer/Source.h408
1 files changed, 408 insertions, 0 deletions
diff --git a/Src/Multiplexer/Source.h b/Src/Multiplexer/Source.h
new file mode 100644
index 0000000..9747e39
--- /dev/null
+++ b/Src/Multiplexer/Source.h
@@ -0,0 +1,408 @@
+/*
+ * Video On Demand Samples
+ *
+ * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You may also obtain this software under a propriety license from Microchip.
+ * Please contact Microchip for further information.
+ *
+ */
+
+/*----------------------------------------------------------*/
+/*! \file
+ * \brief This file contains the CSource class.
+ */
+/*----------------------------------------------------------*/
+#ifndef _SOURCE_H_
+#define _SOURCE_H_
+
+#include <stdint.h>
+#include <stddef.h>
+#include <sys/time.h>
+#include <time.h>
+#include <pthread.h>
+#include "TsPacket.h"
+#include "Stream.h"
+#include "SafeVector.h"
+#include "AutoLock.h"
+
+
+
+/*----------------------------------------------------------*/
+/*! \brief Base class sourcing a transport stream
+ */
+/*----------------------------------------------------------*/
+class CSource : public CTsPacket, public CRingBuffer
+{
+protected:
+ static uint32_t m_nInstCnt;
+ uint32_t m_nInst;
+ bool m_bAutoDestroy;
+ bool m_bEmptyRing;
+ uint32_t m_nAPid;
+ uint32_t m_nVPid;
+ uint32_t m_nPPid;
+ uint32_t m_nLastPcr;
+ uint16_t m_fReqPos;
+ bool m_bPause;
+ bool m_bRepetition;
+ uint64_t m_nBytesRead;
+ uint64_t m_nErrBuf;
+ uint64_t m_nErrPcr;
+ uint64_t m_nErrTs;
+ uint32_t m_tTsStart;
+ uint32_t m_tTsLast;
+ uint32_t m_tTimerStart;
+ uint64_t m_tLastPmt;
+
+
+ CSafeVector<CStream *>m_Stream;
+ pthread_mutex_t m_StreamMutex;
+
+public:
+ static const uint16_t INVALID_POS = 0xFFFF;
+ static const uint32_t READ_LEN = (47*512); // this number MUST be dividable by 188 and 512
+ static const uint32_t NUM_READ_AHEAD = 8; // number of blocks reading ahead
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief Constructs Source instance
+ * \param bAutoDestroy - true, destroys it self, when the
+ * last listener gone
+ */
+ /*----------------------------------------------------------*/
+ CSource(bool bAutoDestroy = true)
+ : CRingBuffer(TS_PACKET_LEN, READ_LEN, NUM_READ_AHEAD)
+ , m_nInst(m_nInstCnt++)
+ , m_bAutoDestroy(bAutoDestroy)
+ , m_bEmptyRing(false)
+ , m_nAPid(PID_INVALID)
+ , m_nVPid(PID_INVALID)
+ , m_nPPid(PID_INVALID)
+ , m_nLastPcr(PCR_INVALID)
+ , m_fReqPos(INVALID_POS)
+ , m_bPause(false)
+ , m_bRepetition(false)
+ , m_nBytesRead(0)
+ , m_nErrBuf(0)
+ , m_nErrPcr(0)
+ , m_nErrTs(0)
+ , m_tTsStart(0)
+ , m_tTsLast(0)
+ , m_tTimerStart(0)
+ , m_tLastPmt(0)
+ {
+ pthread_mutex_init( &m_StreamMutex, NULL );
+ }
+
+
+ virtual ~CSource()
+ {
+ pthread_mutex_destroy( &m_StreamMutex );
+ }
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief Add a listener to a source
+ *
+ * \param pRingBuffer - RingBuffer to source to
+ */
+ /*----------------------------------------------------------*/
+ void AddStream(CStream *pStream)
+ {
+ volatile CAutoLock autoLock( &m_StreamMutex );
+ if ( pStream ) m_Stream.PushBack(pStream);
+ }
+
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief Remove a stream receiving source data
+ */
+ /*----------------------------------------------------------*/
+ void RemoveStream(CStream *pStream)
+ {
+ pthread_mutex_lock( &m_StreamMutex );
+ m_Stream.Remove(pStream);
+ if (m_bAutoDestroy && (0 == m_Stream.Size()))
+ {
+ pthread_mutex_unlock( &m_StreamMutex );
+ delete this;
+ return;
+ }
+ pthread_mutex_unlock( &m_StreamMutex );
+ }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief called periodically to fill ring buffers of all
+ * m_Stream
+ */
+ /*----------------------------------------------------------*/
+ virtual bool FillBuffer() = 0;
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief sends a TS packet from the ring buffer to all
+ * streams
+ */
+ /*----------------------------------------------------------*/
+ bool SendTsPacket(CStream *pStream)
+ {
+ volatile CAutoLock autoLock( &m_StreamMutex );
+ if ( (0 == m_Stream.Size()) || // no sources available?
+ (pStream != m_Stream[0]) ) // don't call a source multiple times if it is sourcing multiple streams
+ {
+ return false;
+ }
+
+ if ( m_bEmptyRing ) // after a seek, the stored data can be flushed
+ {
+ while ( GetCanRead() ) // as long as there is data
+ ReadDone(); // skip TS packet
+
+ m_bEmptyRing = false; // ring is now empty, don't empry again
+ return false;
+ }
+
+ if ( !GetCanRead() )
+ {
+ m_nErrBuf++; // read thread too slow
+ return false;
+ }
+
+ uint8_t *pTs = GetReadPos(); // get pointer to current TS packet
+
+ if ( !GetHasSyncByte(pTs) ) // check packet
+ {
+ m_nErrTs++;
+ ReadDone();
+ return false;
+ }
+
+ struct timeval tv;
+ gettimeofday(&tv, 0);
+ uint64_t tTimerCur = tv.tv_sec * 45000 + (tv.tv_usec * 45) / 1000;
+
+ if ( (tTimerCur - m_tLastPmt) >= (30 * 45) ) // time to send PMT packet (approx 30ms)?
+ {
+ for (uint32_t i = 0; i < m_Stream.Size(); i++)
+ m_Stream[i]->SendPmt();
+
+ m_tLastPmt = tTimerCur;
+ return true;
+ }
+
+ // ------------------------------------------------ //
+ // check PCR //
+ // ------------------------------------------------ //
+ if ((pTs[3] & 0x20) && // check existence of an adaption field
+ (pTs[4]) && // adaption field has length > 0
+ (pTs[5] & 0x10) ) // adaption field contains PCR?
+ {
+ unsigned long tTsCur = // PCR runs from 0 to 45000 in a second
+ (pTs[6] << 24) +
+ (pTs[7] << 16) +
+ (pTs[8] << 8) +
+ (pTs[9]);
+
+ if ((m_tTsStart > tTsCur) || // not the first call, or jumped to start of file
+ (m_tTsLast > tTsCur ) ||
+ (tTsCur - m_tTsLast > 45 * 1000)) { // discontinuity?
+ if (0 != m_tTimerStart) // stream not just started?
+ m_nErrPcr++; // only for statistics
+
+ m_tTimerStart = tTimerCur; // reset time measurement
+ m_tTsStart = tTsCur;
+ }
+ m_tTsLast = tTsCur; // remember last PCR
+
+ int dT = (int)(tTsCur - m_tTsStart) - (int)(tTimerCur - m_tTimerStart);
+
+ if (0 < dT) {
+ if ((45 * 1000 / 2) > dT) { // if the recreation of the TS timing is less 500ms
+ return false; // wait before outputting this packet
+ }
+
+ m_tTimerStart = tTimerCur; // reset time measurement
+ m_tTsStart = tTsCur;
+ m_nErrPcr++; // only for statistics
+ } else if ((-45 * 1000 / 2) > dT) { // if the recreation of the TS timing is off for 500ms
+ m_tTimerStart = tTimerCur; // reset time measurement
+ m_tTsStart = tTsCur;
+ m_tTsStart = tTsCur;
+ m_nErrPcr++; // only for statistics
+ }
+ }
+
+ uint32_t nCurPid = GetPid(pTs);
+
+ if ( nCurPid == m_nAPid )
+ {
+ for (uint32_t i = 0; i < m_Stream.Size(); i++)
+ m_Stream[i]->SendAudio(pTs);
+
+ ReadDone(); // skip and continue with next TS packet
+ return true;
+ }
+ else if ( nCurPid == m_nVPid )
+ {
+ for (uint32_t i = 0; i < m_Stream.Size(); i++)
+ m_Stream[i]->SendVideo(pTs);
+
+ ReadDone(); // skip and continue with next TS packet
+ return true;
+ }
+
+ ReadDone(); // skip and continue with next TS packet
+ return false;
+ }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief returns source's audio PID
+ */
+ /*----------------------------------------------------------*/
+ uint32_t GetAPid() { return m_nAPid; }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief returns source's audio PID
+ */
+ /*----------------------------------------------------------*/
+ uint32_t GetVPid() { return m_nVPid; }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief number of buffer underflows since last call
+ */
+ /*----------------------------------------------------------*/
+ uint64_t GetErrBuf()
+ {
+ uint64_t nTmp = m_nErrBuf;
+ m_nErrBuf = 0;
+ return nTmp;
+ };
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief number of PCR errors since last call
+ */
+ /*----------------------------------------------------------*/
+ uint64_t GetErrPcr()
+ {
+ uint64_t nTmp = m_nErrPcr;
+ m_nErrPcr = 0;
+ return nTmp;
+ };
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief get number of TS syntax violations since last call
+ */
+ /*----------------------------------------------------------*/
+ uint64_t GetErrTs()
+ {
+ uint64_t nTmp = m_nErrTs;
+ m_nErrTs = 0;
+ return nTmp;
+ };
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief returns source's last PCR. By dividing this value
+ * with CTsPacket::PCR_TICKS_PER_SEC the length of
+ * time can be detected. If no length is available,
+ * CTsPacket::PCR_INVALID is returned
+ *
+ */
+ /*----------------------------------------------------------*/
+ uint32_t GetLastPcr() { return m_nLastPcr; }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief returns number of bytes filled into listener's
+ * ring buffers
+ */
+ /*----------------------------------------------------------*/
+ uint64_t GetBytesRead()
+ {
+ uint64_t oldVal = m_nBytesRead;
+ m_nBytesRead = 0;
+ return oldVal;
+ }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief returns if stream is paused
+ */
+ /*----------------------------------------------------------*/
+ bool GetPause() { return m_bPause; }
+
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief unique ID for each stream
+ */
+ /*----------------------------------------------------------*/
+ uint32_t GetInstId() { return m_nInst; };
+
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief sets a new position in the stream
+ */
+ /*----------------------------------------------------------*/
+ void SetPos(uint16_t fReqPos) {
+ m_fReqPos = fReqPos;
+ m_bEmptyRing = true;
+ }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief set repeat mode
+ * \param bRepetition - if true stream will repeated for ever
+ */
+ /*----------------------------------------------------------*/
+ void SetRepetition(bool bRepetition) { m_bRepetition = bRepetition; }
+
+
+
+ /*----------------------------------------------------------*/
+ /*! \brief sets a pause mode on/off
+ */
+ /*----------------------------------------------------------*/
+ void SetPause(bool bPause) { m_bPause = bPause; }
+};
+
+
+
+#endif //_SOURCE_H_