diff --git a/src/engine/sidechain/enginenetworkstream.cpp b/src/engine/sidechain/enginenetworkstream.cpp index dfaf9f6bd04..e062b6cfdd4 100644 --- a/src/engine/sidechain/enginenetworkstream.cpp +++ b/src/engine/sidechain/enginenetworkstream.cpp @@ -1,6 +1,5 @@ #ifdef __WINDOWS__ #include -#include #else #include #include @@ -11,7 +10,15 @@ #include "sampleutil.h" -unsigned int kBufferFrames = 32768; // 743 ms @ 44100 Hz +const int kBufferFrames = 32768; // 743 ms @ 44100 Hz +const int kNetworkLatencyFrames = 8192; // 743 ms @ 44100 Hz +// Related chunk sizes: +// Mp3 frames = 1152 samples +// Ogg frames = 64 to 8192 samples. +// In Mixxx 1.11 we transmit every decoder-frames at once, +// Which results in case of ogg in a dynamic latency from 0.14 ms to to 180 ms +// Now we have switched to a fixed latency of 8192 frames (stereo samples) = +// which is 185 @ 44100 ms and twice the maximum of the max mixxx audio buffer EngineNetworkStream::EngineNetworkStream(int numOutputChannels, int numInputChannels) @@ -20,7 +27,7 @@ EngineNetworkStream::EngineNetworkStream(int numOutputChannels, m_numOutputChannels(numOutputChannels), m_numInputChannels(numInputChannels), m_sampleRate(0), - m_streamStartTimeMs(-1), + m_streamStartTimeUs(-1), m_streamFramesWritten(0), m_streamFramesRead(0) { if (numOutputChannels) { @@ -32,7 +39,7 @@ EngineNetworkStream::EngineNetworkStream(int numOutputChannels, } EngineNetworkStream::~EngineNetworkStream() { - if (m_streamStartTimeMs >= 0) { + if (m_streamStartTimeUs >= 0) { stopStream(); } delete m_pOutputFifo; @@ -41,12 +48,12 @@ EngineNetworkStream::~EngineNetworkStream() { void EngineNetworkStream::startStream(double sampleRate) { m_sampleRate = sampleRate; - m_streamStartTimeMs = getNetworkTimeMs(); + m_streamStartTimeUs = getNetworkTimeUs(); m_streamFramesWritten = 0; } void EngineNetworkStream::stopStream() { - m_streamStartTimeMs = -1; + m_streamStartTimeUs = -1; } int EngineNetworkStream::getWriteExpected() { @@ -73,6 +80,7 @@ void EngineNetworkStream::write(const CSAMPLE* buffer, int frames) { (void)m_pOutputFifo->write(buffer, copyCount); } m_streamFramesWritten += frames; + scheduleWorker(); } void EngineNetworkStream::writeSilence(int frames) { @@ -101,6 +109,14 @@ void EngineNetworkStream::writeSilence(int frames) { m_pOutputFifo->releaseWriteRegions(clearCount); } m_streamFramesWritten += frames; + scheduleWorker(); +} + +void EngineNetworkStream::scheduleWorker() { + if (m_pOutputFifo->readAvailable() + >= m_numOutputChannels * kNetworkLatencyFrames) { + m_pWorker->outputAvailabe(m_pOutputFifo); + } } void EngineNetworkStream::read(CSAMPLE* buffer, int frames) { @@ -120,23 +136,32 @@ void EngineNetworkStream::read(CSAMPLE* buffer, int frames) { } } -qint64 EngineNetworkStream::getStreamTimeMs() { - return getNetworkTimeMs() - m_streamStartTimeMs; +qint64 EngineNetworkStream::getStreamTimeFrames() { + return static_cast(getStreamTimeUs()) * m_sampleRate / 1000000.0; } -qint64 EngineNetworkStream::getStreamTimeFrames() { - return static_cast(getStreamTimeMs()) * m_sampleRate / 1000; +qint64 EngineNetworkStream::getStreamTimeUs() { + return getNetworkTimeUs() - m_streamStartTimeUs; } // static -qint64 EngineNetworkStream::getNetworkTimeMs() { +qint64 EngineNetworkStream::getNetworkTimeUs() { // This matches the GPL2 implementation found in // https://github.com/codders/libshout/blob/a17fb84671d3732317b0353d7281cc47e2df6cf6/src/timing/timing.c + // Instead of ms resuolution we use a us resolution to allow low latency settings + // will overflow > 200,000 years #ifdef __WINDOWS__ - return timeGetTime(); + FILETIME ft; + int64_t t; + GetSystemTimeAsFileTime(&ft); + return ((qint64)ft.dwHighDateTime << 32 | ft.dwLowDateTime) / 10; #else struct timeval mtv; gettimeofday(&mtv, NULL); - return (qint64)(mtv.tv_sec) * 1000 + (qint64)(mtv.tv_usec) / 1000; + return (qint64)(mtv.tv_sec) * 1000000 + mtv.tv_usec; #endif } + +void EngineNetworkStream::addWorker(QSharedPointer pWorker) { + m_pWorker = pWorker; +} diff --git a/src/engine/sidechain/enginenetworkstream.h b/src/engine/sidechain/enginenetworkstream.h index 534fe51fc86..66915734825 100644 --- a/src/engine/sidechain/enginenetworkstream.h +++ b/src/engine/sidechain/enginenetworkstream.h @@ -3,8 +3,7 @@ #include "util/types.h" #include "util/fifo.h" - -class SideChainWorker; +#include "engine/sidechain/sidechainworker.h" class EngineNetworkStream { public: @@ -22,7 +21,7 @@ class EngineNetworkStream { void read(CSAMPLE* buffer, int frames); void writeSilence(int frames); - qint64 getStreamTimeMs(); + qint64 getStreamTimeUs(); qint64 getStreamTimeFrames(); int getNumOutputChannels() { @@ -33,19 +32,19 @@ class EngineNetworkStream { return m_numInputChannels; } - static qint64 getNetworkTimeMs(); + static qint64 getNetworkTimeUs(); - void addWorker(QSharedPointer pWorker) { - m_pWorker = pWorker; - } + void addWorker(QSharedPointer pWorker); private: + void scheduleWorker(); + FIFO* m_pOutputFifo; FIFO* m_pInputFifo; int m_numOutputChannels; int m_numInputChannels; double m_sampleRate; - qint64 m_streamStartTimeMs; + qint64 m_streamStartTimeUs; qint64 m_streamFramesWritten; qint64 m_streamFramesRead; QSharedPointer m_pWorker; diff --git a/src/engine/sidechain/engineshoutcast.cpp b/src/engine/sidechain/engineshoutcast.cpp index 32bd585b9d9..81664297c00 100644 --- a/src/engine/sidechain/engineshoutcast.cpp +++ b/src/engine/sidechain/engineshoutcast.cpp @@ -62,7 +62,8 @@ EngineShoutcast::EngineShoutcast(ConfigObject* _config) m_protocol_is_icecast1(false), m_protocol_is_icecast2(false), m_protocol_is_shoutcast(false), - m_ogg_dynamic_update(false) { + m_ogg_dynamic_update(false), + m_bThreadQuit(false) { #ifndef __WINDOWS__ // Ignore SIGPIPE signals that we get when the remote streaming server @@ -92,11 +93,6 @@ EngineShoutcast::EngineShoutcast(ConfigObject* _config) errorDialog(tr("Error setting non-blocking mode:"), shout_get_error(m_pShout)); return; } - - m_bThreadQuit = false; - this->moveToThread(&m_SThread); - connect(&m_SThread, SIGNAL(started()), this, SLOT(shoutcastThread())); - connect(this, SIGNAL(finished()), &m_SThread, SLOT(deleteLater())); } EngineShoutcast::~EngineShoutcast() { @@ -107,18 +103,6 @@ EngineShoutcast::~EngineShoutcast() { delete m_encoder; } - // Make sure everything is send from cache - // before we stop - while (1) { - m_SMutex.lock(); - if (m_pShoutcastCache.size() == 0) { - m_SMutex.unlock(); - break; - } - m_SMutex.unlock(); - sleep(1); - } - delete m_pUpdateShoutcastFromPrefs; delete m_pShoutcastNeedUpdateFromPrefs; delete m_pShoutcastStatus; @@ -133,31 +117,12 @@ EngineShoutcast::~EngineShoutcast() { } shout_shutdown(); - m_SThread.quit(); + wait(); // until the thread ends. } bool EngineShoutcast::serverDisconnect() { m_bThreadQuit = true; - - if (m_encoder) { - m_encoder->flush(); - delete m_encoder; - m_encoder = NULL; - } - - // Make sure everything is send from cache - // before we stop - while (1) { - m_SMutex.lock(); - if (m_pShoutcastCache.size() == 0) { - m_SMutex.unlock(); - break; - } - m_SMutex.unlock(); - sleep(1); - } - - m_SThread.quit(); + wait(); m_pShoutcastStatus->set(SHOUTCAST_DISCONNECTED); @@ -480,7 +445,7 @@ bool EngineShoutcast::serverConnect() { m_pShoutcastStatus->set(SHOUTCAST_CONNECTED); m_bThreadQuit = false; - m_SThread.start(); + start(); return true; } @@ -496,45 +461,6 @@ bool EngineShoutcast::serverConnect() { void EngineShoutcast::write(unsigned char *header, unsigned char *body, int headerLen, int bodyLen) { - struct shoutcastCacheObject *l_SCacheObj = (struct shoutcastCacheObject *) malloc(sizeof(struct shoutcastCacheObject)); - - if (l_SCacheObj == NULL) - { - qDebug() << "EngineShoutcast::write: Can't allocate memory!"; - return; - } - - memset(l_SCacheObj, 0x00, sizeof(struct shoutcastCacheObject)); - l_SCacheObj->header = (unsigned char *)malloc(headerLen); - - if (l_SCacheObj->header == NULL) { - qDebug() << "EngineShoutcast::write: Can't allocate memory!"; - free(l_SCacheObj); - return; - } - - l_SCacheObj->body = (unsigned char *)malloc(bodyLen); - - if (l_SCacheObj->body == NULL) { - qDebug() << "EngineShoutcast::write: Can't allocate memory!"; - free(l_SCacheObj->header); - free(l_SCacheObj); - return; - } - - memcpy(l_SCacheObj->header, header, headerLen); - memcpy(l_SCacheObj->body, body, bodyLen); - - l_SCacheObj->headerLen = headerLen; - l_SCacheObj->bodyLen = bodyLen; - - m_SMutex.lock(); - m_pShoutcastCache.append(l_SCacheObj); - m_SMutex.unlock(); -} - -void EngineShoutcast::serverWrite(unsigned char *header, unsigned char *body, - int headerLen, int bodyLen) { int ret; if (!m_pShout) @@ -543,6 +469,7 @@ void EngineShoutcast::serverWrite(unsigned char *header, unsigned char *body, if (m_iShoutStatus == SHOUTERR_CONNECTED) { // Send header if there is one if (headerLen > 0) { + // We are already synced by EngineNetworkstream ret = shout_send(m_pShout, header, headerLen); if (ret != SHOUTERR_SUCCESS) { qDebug() << "DEBUG: Send error: " << shout_get_error(m_pShout); @@ -565,7 +492,8 @@ void EngineShoutcast::serverWrite(unsigned char *header, unsigned char *body, qDebug() << "DEBUG: Send error: " << shout_get_error(m_pShout); if (m_iShoutFailures > 3) { if(!serverConnect()) - errorDialog(tr("Lost connection to streaming server"), tr("Please check your connection to the Internet and verify that your username and password are correct.")); + errorDialog(tr("Lost connection to streaming server"), + tr("Please check your connection to the Internet and verify that your username and password are correct.")); } else{ m_iShoutFailures++; @@ -585,8 +513,11 @@ void EngineShoutcast::serverWrite(unsigned char *header, unsigned char *body, } void EngineShoutcast::process(const CSAMPLE* pBuffer, const int iBufferSize) { - //Check to see if Shoutcast is enabled, and pass the samples off to be broadcast if necessary. - bool prefEnabled = (m_pConfig->getValueString(ConfigKey(SHOUTCAST_PREF_KEY,"enabled")).toInt() == 1); + qDebug() << "EngineShoutcast::process"; + // Check to see if Shoutcast is enabled, and pass the samples off to be + // broadcast if necessary. + bool prefEnabled = (m_pConfig->getValueString( + ConfigKey(SHOUTCAST_PREF_KEY, "enabled")).toInt() == 1); if (!prefEnabled) { if (isConnected()) { @@ -627,6 +558,7 @@ void EngineShoutcast::process(const CSAMPLE* pBuffer, const int iBufferSize) { // If we are connected, encode the samples. if (iBufferSize > 0 && m_encoder) { m_encoder->encodeBuffer(pBuffer, iBufferSize); + // the encoded frames are received by the write() callback. } // Check if track metadata has changed and if so, update. @@ -787,61 +719,40 @@ void EngineShoutcast::infoDialog(QString text, QString detailedInfo) { ErrorDialogHandler::instance()->requestErrorDialog(props); } -void EngineShoutcast::shoutcastThread() { - struct shoutcastCacheObject *l_SCacheObj = NULL; - unsigned int m_lCacheSize = 0; - - // We IDLE until the cache has filled a bit - while (1) - { - sleep(5); - m_SMutex.lock(); - m_lCacheSize = m_pShoutcastCache.size(); - if (m_lCacheSize > 10) { - m_SMutex.unlock(); - break; - } - m_SMutex.unlock(); - } - - while (1) - { - shout_sync(m_pShout); - m_SMutex.lock(); - // Ok this shouldn't happen cache is empty and we have nothing to send. - // wait for next to come. - if (m_bThreadQuit == false && m_pShoutcastCache.size() == 0) { - qDebug() << "EngineShoutcast::shoutcastThread: Encoding not fast enough. Cache is empty!"; - qDebug() << "EngineShoutcast::shoutcastThread: This means your stream will break"; - m_SMutex.unlock(); - sleep(5); - continue; - } else if (m_pShoutcastCache.size() > 0) { - // Take first cache object and remove it - l_SCacheObj = m_pShoutcastCache.first(); - m_pShoutcastCache.remove(0); - m_lCacheSize = m_pShoutcastCache.size(); - } else { - // We don't have anything to send anymore - l_SCacheObj = NULL; - m_lCacheSize = 0; - } - m_SMutex.unlock(); - - if (l_SCacheObj != NULL) { - // I don't know if emiting Signal is more QT style but doing it like this now. - serverWrite(l_SCacheObj->header, - l_SCacheObj->body, - l_SCacheObj->headerLen, - l_SCacheObj->bodyLen); - - free(l_SCacheObj->header); - free(l_SCacheObj->body); - free(l_SCacheObj); - } +// Is called from the Mixxx engine thread +void EngineShoutcast::outputAvailabe(FIFO* pOutputFifo) { + m_pOutputFifo = pOutputFifo; + m_readSema.release(); +} - if (m_bThreadQuit == true && m_lCacheSize == 0) { - break; +void EngineShoutcast::run() { + unsigned static id = 0; + QThread::currentThread()->setObjectName(QString("EngineShoutcast %1").arg(++id)); + qDebug() << "starting thread"; + for(;;) { + m_readSema.acquire(); + if (m_bThreadQuit) { + if (m_encoder) { + m_encoder->flush(); + delete m_encoder; + m_encoder = NULL; + } + return; + } + int readAvailable = m_pOutputFifo->readAvailable(); + if (readAvailable) { + CSAMPLE* dataPtr1; + ring_buffer_size_t size1; + CSAMPLE* dataPtr2; + ring_buffer_size_t size2; + // We use size1 and size2, so we can ignore the return value + (void)m_pOutputFifo->aquireReadRegions(readAvailable, &dataPtr1, &size1, + &dataPtr2, &size2); + process(dataPtr1, size1); + if (size2 > 0) { + process(dataPtr1, size2); + } + m_pOutputFifo->releaseReadRegions(readAvailable); } } } diff --git a/src/engine/sidechain/engineshoutcast.h b/src/engine/sidechain/engineshoutcast.h index 8afad53e937..fb51000be4d 100644 --- a/src/engine/sidechain/engineshoutcast.h +++ b/src/engine/sidechain/engineshoutcast.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "configobject.h" #include "controlobject.h" @@ -33,6 +34,7 @@ #include "engine/sidechain/sidechainworker.h" #include "errordialoghandler.h" #include "trackinfoobject.h" +#include "util/fifo.h" #define SHOUTCAST_DISCONNECTED 0 #define SHOUTCAST_CONNECTING 1 @@ -40,15 +42,6 @@ class Encoder; -// This one is used as cache object -struct shoutcastCacheObject { - quint64 time; - unsigned char *header; - unsigned char *body; - int headerLen; - int bodyLen; -}; - // Forward declare libshout structures to prevent leaking shout.h definitions // beyond where they are needed. struct shout; @@ -56,7 +49,7 @@ typedef struct shout shout_t; struct _util_dict; typedef struct _util_dict shout_metadata_t; -class EngineShoutcast : public QObject, public EncoderCallback, public SideChainWorker { +class EngineShoutcast : public QThread, public EncoderCallback, public SideChainWorker { Q_OBJECT public: EngineShoutcast(ConfigObject* _config); @@ -78,13 +71,15 @@ class EngineShoutcast : public QObject, public EncoderCallback, public SideChain bool serverConnect(); bool serverDisconnect(); bool isConnected(); + + virtual void outputAvailabe(FIFO* pOutputFifo); + + virtual void run(); + public slots: /** Update the libshout struct with info from Mixxx's shoutcast preferences.*/ void updateFromPreferences(); - // static void wrapper2writePage(); - //private slots: - // void writePage(unsigned char *header, unsigned char *body, - // int headerLen, int bodyLen, int count); + private: int getActiveTracks(); // Check if the metadata has changed since the previous check. We also @@ -135,11 +130,8 @@ class EngineShoutcast : public QObject, public EncoderCallback, public SideChain bool m_ogg_dynamic_update; QVector m_pShoutcastCache; bool m_bThreadQuit; - QThread m_SThread; - QMutex m_SMutex; - - private slots: - void shoutcastThread(); + QSemaphore m_readSema; + FIFO* m_pOutputFifo; }; #endif diff --git a/src/engine/sidechain/sidechainworker.h b/src/engine/sidechain/sidechainworker.h index ad44194fd96..54cbc6605f9 100644 --- a/src/engine/sidechain/sidechainworker.h +++ b/src/engine/sidechain/sidechainworker.h @@ -2,6 +2,7 @@ #define SIDECHAINWORKER_H #include "util/types.h" +#include "util/fifo.h" class SideChainWorker { public: @@ -9,6 +10,9 @@ class SideChainWorker { virtual ~SideChainWorker() { } virtual void process(const CSAMPLE* pBuffer, const int iBufferSize) = 0; virtual void shutdown() = 0; + virtual void outputAvailabe(FIFO* pOutputFifo) { + Q_UNUSED(pOutputFifo); + }; }; #endif /* SIDECHAINWORKER_H */