/*
* Video On Demand Samples
*
* Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
* You may also obtain this software under a propriety license from Microchip.
* Please contact Microchip for further information.
*
*/
/*----------------------------------------------------------*/
/*! \file
* \brief This file contains the CSource class.
*/
/*----------------------------------------------------------*/
#ifndef _SOURCE_H_
#define _SOURCE_H_
#include
#include
#include
#include
#include
#include "TsPacket.h"
#include "Stream.h"
#include "SafeVector.h"
#include "AutoLock.h"
/*----------------------------------------------------------*/
/*! \brief Base class sourcing a transport stream
*/
/*----------------------------------------------------------*/
class CSource : public CTsPacket, public CRingBuffer
{
protected:
static uint32_t m_nInstCnt;
uint32_t m_nInst;
bool m_bAutoDestroy;
bool m_bEmptyRing;
uint32_t m_nAPid;
uint32_t m_nVPid;
uint32_t m_nPPid;
uint32_t m_nLastPcr;
uint16_t m_fReqPos;
bool m_bPause;
bool m_bRepetition;
uint64_t m_nBytesRead;
uint64_t m_nErrBuf;
uint64_t m_nErrPcr;
uint64_t m_nErrTs;
uint32_t m_tTsStart;
uint32_t m_tTsLast;
uint32_t m_tTimerStart;
uint64_t m_tLastPmt;
CSafeVectorm_Stream;
pthread_mutex_t m_StreamMutex;
public:
static const uint16_t INVALID_POS = 0xFFFF;
static const uint32_t READ_LEN = (47*512); // this number MUST be dividable by 188 and 512
static const uint32_t NUM_READ_AHEAD = 8; // number of blocks reading ahead
/*----------------------------------------------------------*/
/*! \brief Constructs Source instance
* \param bAutoDestroy - true, destroys it self, when the
* last listener gone
*/
/*----------------------------------------------------------*/
CSource(bool bAutoDestroy = true)
: CRingBuffer(TS_PACKET_LEN, READ_LEN, NUM_READ_AHEAD)
, m_nInst(m_nInstCnt++)
, m_bAutoDestroy(bAutoDestroy)
, m_bEmptyRing(false)
, m_nAPid(PID_INVALID)
, m_nVPid(PID_INVALID)
, m_nPPid(PID_INVALID)
, m_nLastPcr(PCR_INVALID)
, m_fReqPos(INVALID_POS)
, m_bPause(false)
, m_bRepetition(false)
, m_nBytesRead(0)
, m_nErrBuf(0)
, m_nErrPcr(0)
, m_nErrTs(0)
, m_tTsStart(0)
, m_tTsLast(0)
, m_tTimerStart(0)
, m_tLastPmt(0)
{
pthread_mutex_init( &m_StreamMutex, NULL );
}
virtual ~CSource()
{
pthread_mutex_destroy( &m_StreamMutex );
}
/*----------------------------------------------------------*/
/*! \brief Add a listener to a source
*
* \param pRingBuffer - RingBuffer to source to
*/
/*----------------------------------------------------------*/
void AddStream(CStream *pStream)
{
volatile CAutoLock autoLock( &m_StreamMutex );
if ( pStream ) m_Stream.PushBack(pStream);
}
/*----------------------------------------------------------*/
/*! \brief Remove a stream receiving source data
*/
/*----------------------------------------------------------*/
void RemoveStream(CStream *pStream)
{
pthread_mutex_lock( &m_StreamMutex );
m_Stream.Remove(pStream);
if (m_bAutoDestroy && (0 == m_Stream.Size()))
{
pthread_mutex_unlock( &m_StreamMutex );
delete this;
return;
}
pthread_mutex_unlock( &m_StreamMutex );
}
/*----------------------------------------------------------*/
/*! \brief called periodically to fill ring buffers of all
* m_Stream
*/
/*----------------------------------------------------------*/
virtual bool FillBuffer() = 0;
/*----------------------------------------------------------*/
/*! \brief sends a TS packet from the ring buffer to all
* streams
*/
/*----------------------------------------------------------*/
bool SendTsPacket(CStream *pStream)
{
volatile CAutoLock autoLock( &m_StreamMutex );
if ( (0 == m_Stream.Size()) || // no sources available?
(pStream != m_Stream[0]) ) // don't call a source multiple times if it is sourcing multiple streams
{
return false;
}
if ( m_bEmptyRing ) // after a seek, the stored data can be flushed
{
while ( GetCanRead() ) // as long as there is data
ReadDone(); // skip TS packet
m_bEmptyRing = false; // ring is now empty, don't empry again
return false;
}
if ( !GetCanRead() )
{
m_nErrBuf++; // read thread too slow
return false;
}
uint8_t *pTs = GetReadPos(); // get pointer to current TS packet
if ( !GetHasSyncByte(pTs) ) // check packet
{
m_nErrTs++;
ReadDone();
return false;
}
struct timeval tv;
gettimeofday(&tv, 0);
uint64_t tTimerCur = tv.tv_sec * 45000 + (tv.tv_usec * 45) / 1000;
if ( (tTimerCur - m_tLastPmt) >= (30 * 45) ) // time to send PMT packet (approx 30ms)?
{
for (uint32_t i = 0; i < m_Stream.Size(); i++)
m_Stream[i]->SendPmt();
m_tLastPmt = tTimerCur;
return true;
}
// ------------------------------------------------ //
// check PCR //
// ------------------------------------------------ //
if ((pTs[3] & 0x20) && // check existence of an adaption field
(pTs[4]) && // adaption field has length > 0
(pTs[5] & 0x10) ) // adaption field contains PCR?
{
unsigned long tTsCur = // PCR runs from 0 to 45000 in a second
(pTs[6] << 24) +
(pTs[7] << 16) +
(pTs[8] << 8) +
(pTs[9]);
if ((m_tTsStart > tTsCur) || // not the first call, or jumped to start of file
(m_tTsLast > tTsCur ) ||
(tTsCur - m_tTsLast > 45 * 1000)) { // discontinuity?
if (0 != m_tTimerStart) // stream not just started?
m_nErrPcr++; // only for statistics
m_tTimerStart = tTimerCur; // reset time measurement
m_tTsStart = tTsCur;
}
m_tTsLast = tTsCur; // remember last PCR
int dT = (int)(tTsCur - m_tTsStart) - (int)(tTimerCur - m_tTimerStart);
if (0 < dT) {
if ((45 * 1000 / 2) > dT) { // if the recreation of the TS timing is less 500ms
return false; // wait before outputting this packet
}
m_tTimerStart = tTimerCur; // reset time measurement
m_tTsStart = tTsCur;
m_nErrPcr++; // only for statistics
} else if ((-45 * 1000 / 2) > dT) { // if the recreation of the TS timing is off for 500ms
m_tTimerStart = tTimerCur; // reset time measurement
m_tTsStart = tTsCur;
m_tTsStart = tTsCur;
m_nErrPcr++; // only for statistics
}
}
uint32_t nCurPid = GetPid(pTs);
if ( nCurPid == m_nAPid )
{
for (uint32_t i = 0; i < m_Stream.Size(); i++)
m_Stream[i]->SendAudio(pTs);
ReadDone(); // skip and continue with next TS packet
return true;
}
else if ( nCurPid == m_nVPid )
{
for (uint32_t i = 0; i < m_Stream.Size(); i++)
m_Stream[i]->SendVideo(pTs);
ReadDone(); // skip and continue with next TS packet
return true;
}
ReadDone(); // skip and continue with next TS packet
return false;
}
/*----------------------------------------------------------*/
/*! \brief returns source's audio PID
*/
/*----------------------------------------------------------*/
uint32_t GetAPid() { return m_nAPid; }
/*----------------------------------------------------------*/
/*! \brief returns source's audio PID
*/
/*----------------------------------------------------------*/
uint32_t GetVPid() { return m_nVPid; }
/*----------------------------------------------------------*/
/*! \brief number of buffer underflows since last call
*/
/*----------------------------------------------------------*/
uint64_t GetErrBuf()
{
uint64_t nTmp = m_nErrBuf;
m_nErrBuf = 0;
return nTmp;
};
/*----------------------------------------------------------*/
/*! \brief number of PCR errors since last call
*/
/*----------------------------------------------------------*/
uint64_t GetErrPcr()
{
uint64_t nTmp = m_nErrPcr;
m_nErrPcr = 0;
return nTmp;
};
/*----------------------------------------------------------*/
/*! \brief get number of TS syntax violations since last call
*/
/*----------------------------------------------------------*/
uint64_t GetErrTs()
{
uint64_t nTmp = m_nErrTs;
m_nErrTs = 0;
return nTmp;
};
/*----------------------------------------------------------*/
/*! \brief returns source's last PCR. By dividing this value
* with CTsPacket::PCR_TICKS_PER_SEC the length of
* time can be detected. If no length is available,
* CTsPacket::PCR_INVALID is returned
*
*/
/*----------------------------------------------------------*/
uint32_t GetLastPcr() { return m_nLastPcr; }
/*----------------------------------------------------------*/
/*! \brief returns number of bytes filled into listener's
* ring buffers
*/
/*----------------------------------------------------------*/
uint64_t GetBytesRead()
{
uint64_t oldVal = m_nBytesRead;
m_nBytesRead = 0;
return oldVal;
}
/*----------------------------------------------------------*/
/*! \brief returns if stream is paused
*/
/*----------------------------------------------------------*/
bool GetPause() { return m_bPause; }
/*----------------------------------------------------------*/
/*! \brief unique ID for each stream
*/
/*----------------------------------------------------------*/
uint32_t GetInstId() { return m_nInst; };
/*----------------------------------------------------------*/
/*! \brief sets a new position in the stream
*/
/*----------------------------------------------------------*/
void SetPos(uint16_t fReqPos) {
m_fReqPos = fReqPos;
m_bEmptyRing = true;
}
/*----------------------------------------------------------*/
/*! \brief set repeat mode
* \param bRepetition - if true stream will repeated for ever
*/
/*----------------------------------------------------------*/
void SetRepetition(bool bRepetition) { m_bRepetition = bRepetition; }
/*----------------------------------------------------------*/
/*! \brief sets a pause mode on/off
*/
/*----------------------------------------------------------*/
void SetPause(bool bPause) { m_bPause = bPause; }
};
#endif //_SOURCE_H_