/*
 * Video On Demand Samples
 *
 * Copyright (C) 2015 Microchip Technology Germany II GmbH & Co. KG
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * You may also obtain this software under a propriety license from Microchip.
 * Please contact Microchip for further information.
 *
 */

/*----------------------------------------------------------*/
/*! \file
 *  \brief  This file contains the CSource class.
 */
/*----------------------------------------------------------*/
#ifndef _SOURCE_H_
#define _SOURCE_H_

#include <stdint.h>
#include <stddef.h>
#include <sys/time.h>
#include <time.h>
#include <pthread.h>
#include "TsPacket.h"
#include "Stream.h"
#include "SafeVector.h"
#include "AutoLock.h"



/*----------------------------------------------------------*/
/*! \brief Base class sourcing a transport stream
 */
/*----------------------------------------------------------*/
class CSource : public CTsPacket, public CRingBuffer
{    
protected:
    static uint32_t m_nInstCnt;
    uint32_t        m_nInst;
    bool            m_bAutoDestroy;
    bool            m_bEmptyRing;
    uint32_t        m_nAPid;
    uint32_t        m_nVPid;
    uint32_t        m_nPPid;
    uint32_t        m_nLastPcr;
    uint16_t        m_fReqPos;
    bool            m_bPause;
    bool            m_bRepetition;
    uint64_t        m_nBytesRead;    
    uint64_t        m_nErrBuf;
    uint64_t        m_nErrPcr;
    uint64_t        m_nErrTs;
    uint32_t        m_tTsStart;
    uint32_t        m_tTsLast;
    uint32_t        m_tTimerStart;
    uint64_t        m_tLastPmt;


    CSafeVector<CStream *>m_Stream;
    pthread_mutex_t m_StreamMutex;

public:
    static const uint16_t       INVALID_POS     = 0xFFFF;
    static const uint32_t   READ_LEN        = (47*512);         // this number MUST be dividable by 188 and 512 
    static const uint32_t   NUM_READ_AHEAD  = 8;                // number of blocks reading ahead



    /*----------------------------------------------------------*/
    /*! \brief Constructs Source instance
     *  \param bAutoDestroy - true, destroys it self, when the 
     *         last listener gone
     */
    /*----------------------------------------------------------*/
    CSource(bool bAutoDestroy = true)
        : CRingBuffer(TS_PACKET_LEN, READ_LEN, NUM_READ_AHEAD)
        , m_nInst(m_nInstCnt++)
        , m_bAutoDestroy(bAutoDestroy)
        , m_bEmptyRing(false)
        , m_nAPid(PID_INVALID)
        , m_nVPid(PID_INVALID)
        , m_nPPid(PID_INVALID)
        , m_nLastPcr(PCR_INVALID)
        , m_fReqPos(INVALID_POS)
        , m_bPause(false)
        , m_bRepetition(false)
        , m_nBytesRead(0)
        , m_nErrBuf(0)
        , m_nErrPcr(0)
        , m_nErrTs(0)
        , m_tTsStart(0)
        , m_tTsLast(0)
        , m_tTimerStart(0)
        , m_tLastPmt(0)
    {
        pthread_mutex_init( &m_StreamMutex, NULL );
    }


    virtual ~CSource()
    {
        pthread_mutex_destroy( &m_StreamMutex );
    }


    /*----------------------------------------------------------*/
    /*! \brief Add a listener to a source
    *
     *  \param pRingBuffer - RingBuffer to source to
     */
    /*----------------------------------------------------------*/
    void AddStream(CStream *pStream)
    {
        volatile CAutoLock autoLock( &m_StreamMutex );
        if ( pStream ) m_Stream.PushBack(pStream);
    }




    /*----------------------------------------------------------*/
    /*! \brief Remove a stream receiving source data
     */
    /*----------------------------------------------------------*/
    void RemoveStream(CStream *pStream)
    {
        pthread_mutex_lock( &m_StreamMutex );
        m_Stream.Remove(pStream);
        if (m_bAutoDestroy && (0 == m_Stream.Size()))
        {
            pthread_mutex_unlock( &m_StreamMutex );
            delete this;
            return;
        }
        pthread_mutex_unlock( &m_StreamMutex );
    }



    /*----------------------------------------------------------*/
    /*! \brief called periodically to fill ring buffers of all
     *         m_Stream
     */
    /*----------------------------------------------------------*/
    virtual bool FillBuffer() = 0;



    /*----------------------------------------------------------*/
    /*! \brief sends a TS packet from the ring buffer to all
     *         streams
     */
    /*----------------------------------------------------------*/
    bool SendTsPacket(CStream *pStream)
    {
        volatile CAutoLock autoLock( &m_StreamMutex );
        if ( (0 == m_Stream.Size()) ||                      // no sources available?
             (pStream != m_Stream[0]) )                     // don't call a source multiple times if it is sourcing multiple streams
        {
            return false;
        }
        
        if ( m_bEmptyRing )                                 // after a seek, the stored data can be flushed
        {
            while ( GetCanRead() )                          // as long as there is data
                ReadDone();                                 // skip TS packet
            
            m_bEmptyRing = false;                           // ring is now empty, don't empry again
            return false;
        }
        
        if ( !GetCanRead() )
        {
            m_nErrBuf++;                                    // read thread too slow
            return false;
        }
        
        uint8_t *pTs = GetReadPos();                        // get pointer to current TS packet
        
        if ( !GetHasSyncByte(pTs) )                         // check packet
        {
            m_nErrTs++;
            ReadDone();
            return false;
        }
                        
        struct timeval tv;
        gettimeofday(&tv, 0);
        uint64_t tTimerCur = tv.tv_sec * 45000 + (tv.tv_usec * 45) / 1000;
        
        if ( (tTimerCur - m_tLastPmt) >= (30 * 45) )       // time to send PMT packet (approx 30ms)?
        {
            for (uint32_t i = 0; i < m_Stream.Size(); i++) 
                m_Stream[i]->SendPmt();
                
            m_tLastPmt = tTimerCur;
            return true;
        }
        
        // ------------------------------------------------ //
        // check PCR                                        //
        // ------------------------------------------------ //
        if ((pTs[3] & 0x20) &&                              // check existence of an adaption field
            (pTs[4])        &&                              // adaption field has length > 0
            (pTs[5] & 0x10) )                               // adaption field contains PCR?
        {
            unsigned long tTsCur =                          // PCR runs from 0 to 45000 in a second
                    (pTs[6] << 24) +
                    (pTs[7] << 16) +
                    (pTs[8] << 8) +
                    (pTs[9]);

            if ((m_tTsStart > tTsCur) ||                    // not the first call, or jumped to start of file
                (m_tTsLast > tTsCur ) ||
                (tTsCur - m_tTsLast > 45 * 1000)) {         // discontinuity?
                if (0 != m_tTimerStart)                     // stream not just started?
                    m_nErrPcr++;                            // only for statistics

                m_tTimerStart = tTimerCur;                  // reset time measurement
                m_tTsStart = tTsCur;
            }
            m_tTsLast = tTsCur;                             // remember last PCR

            int dT = (int)(tTsCur - m_tTsStart) - (int)(tTimerCur - m_tTimerStart);
            
            if (0 < dT) {
                if ((45 * 1000 / 2) > dT) {                 // if the recreation of the TS timing is less 500ms
                    return false;                           // wait before outputting this packet
                }

                m_tTimerStart = tTimerCur;                  // reset time measurement
                m_tTsStart = tTsCur;
                m_nErrPcr++;                                // only for statistics
            } else if ((-45 * 1000 / 2) > dT) {             // if the recreation of the TS timing is off for 500ms
                m_tTimerStart = tTimerCur;                  // reset time measurement
                m_tTsStart = tTsCur;
                m_tTsStart = tTsCur;
                m_nErrPcr++;                                // only for statistics
            }
        }
    
        uint32_t nCurPid = GetPid(pTs);
        
        if ( nCurPid == m_nAPid ) 
        {
            for (uint32_t i = 0; i < m_Stream.Size(); i++) 
                m_Stream[i]->SendAudio(pTs);
                
            ReadDone();                                     // skip and continue with next TS packet
            return true;
        } 
        else if ( nCurPid == m_nVPid )
        {
            for (uint32_t i = 0; i < m_Stream.Size(); i++) 
                m_Stream[i]->SendVideo(pTs);
                
            ReadDone();                                     // skip and continue with next TS packet
            return true;
        } 
        
        ReadDone();                                         // skip and continue with next TS packet
        return false;
    }

    
    
    /*----------------------------------------------------------*/
    /*! \brief returns source's audio PID
     */
    /*----------------------------------------------------------*/
    uint32_t GetAPid()  { return m_nAPid; }



    /*----------------------------------------------------------*/
    /*! \brief returns source's audio PID
     */
    /*----------------------------------------------------------*/
    uint32_t GetVPid()  { return m_nVPid; }

    
    
    /*----------------------------------------------------------*/
    /*! \brief number of buffer underflows since last call
     */
    /*----------------------------------------------------------*/
    uint64_t GetErrBuf()
    {
        uint64_t nTmp = m_nErrBuf;
        m_nErrBuf = 0;
        return nTmp;
    };

    
    
    /*----------------------------------------------------------*/
    /*! \brief number of PCR errors since last call
     */
    /*----------------------------------------------------------*/
    uint64_t GetErrPcr()
    {
        uint64_t nTmp = m_nErrPcr;
        m_nErrPcr = 0;
        return nTmp;
    };



    /*----------------------------------------------------------*/
    /*! \brief get number of TS syntax violations since last call
     */
    /*----------------------------------------------------------*/
    uint64_t GetErrTs()
    {
        uint64_t nTmp = m_nErrTs;
        m_nErrTs = 0;
        return nTmp;
    };

    
    
    /*----------------------------------------------------------*/
    /*! \brief returns source's last PCR. By dividing this value
    *         with CTsPacket::PCR_TICKS_PER_SEC the length of
    *         time can be detected. If no length is available,
    *         CTsPacket::PCR_INVALID is returned
    *
     */
    /*----------------------------------------------------------*/
    uint32_t GetLastPcr() { return m_nLastPcr; }



    /*----------------------------------------------------------*/
    /*! \brief returns number of bytes filled into listener's
    *         ring buffers
     */
    /*----------------------------------------------------------*/
    uint64_t GetBytesRead()
    {
        uint64_t oldVal = m_nBytesRead;
        m_nBytesRead = 0;
        return oldVal;
    }



    /*----------------------------------------------------------*/
    /*! \brief returns if stream is paused
     */
    /*----------------------------------------------------------*/
    bool GetPause() { return m_bPause; }

    
    

    /*----------------------------------------------------------*/
    /*! \brief unique ID for each stream
     */
    /*----------------------------------------------------------*/
    uint32_t GetInstId() { return m_nInst; };

    
    
    
    /*----------------------------------------------------------*/
    /*! \brief sets a new position in the stream
     */
    /*----------------------------------------------------------*/
    void SetPos(uint16_t fReqPos) { 
        m_fReqPos = fReqPos; 
        m_bEmptyRing = true;
    }



    /*----------------------------------------------------------*/
    /*! \brief set repeat mode
     *  \param bRepetition - if true stream will repeated for ever
     */
    /*----------------------------------------------------------*/
    void SetRepetition(bool bRepetition) { m_bRepetition = bRepetition; }



    /*----------------------------------------------------------*/
    /*! \brief sets a pause mode on/off
     */
    /*----------------------------------------------------------*/
    void SetPause(bool bPause) { m_bPause = bPause; }
};



#endif //_SOURCE_H_