From dde05c63155b705e6337e9a633886432cec32f2c Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 5 Dec 2014 14:38:43 +0800 Subject: [PATCH] for bug #251, refine the send use cond wait. --- trunk/src/app/srs_app_rtmp_conn.cpp | 13 +++++---- trunk/src/app/srs_app_source.cpp | 38 +++++++++++++++++++++++++ trunk/src/app/srs_app_source.hpp | 20 +++++++++++++ trunk/src/core/srs_core.hpp | 2 +- trunk/src/core/srs_core_performance.hpp | 5 ++++ 5 files changed, 71 insertions(+), 7 deletions(-) diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 347f8d7a26..0de7e9ba6f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -598,6 +598,10 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // collect elapse for pithy print. pithy_print.elapse(); + // wait for message to incoming. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 + consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); + // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; @@ -606,12 +610,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) return ret; } - // no message to send, sleep a while. - if (count <= 0) { - srs_verbose("sleep for no messages to send"); - st_usleep(mw_sleep * 1000); - } - srs_info("got %d msgs, mw=%d", count, mw_sleep); + // we use wait to get messages, so the count must be positive. + srs_assert(count > 0); + srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep); // reportable if (pithy_print.can_print()) { diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index baa3e50192..595dfbe2e7 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -166,6 +166,16 @@ SrsMessageQueue::~SrsMessageQueue() clear(); } +int SrsMessageQueue::count() +{ + return (int)msgs.size(); +} + +int SrsMessageQueue::duration() +{ + return (int)(av_end_time - av_start_time); +} + void SrsMessageQueue::set_queue_size(double queue_size) { queue_size_ms = (int)(queue_size * 1000); @@ -290,6 +300,11 @@ SrsConsumer::SrsConsumer(SrsSource* _source) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; + + mw_wait = st_cond_new(); + mw_min_msgs = 0; + mw_duration = 0; + mw_waiting = false; } SrsConsumer::~SrsConsumer() @@ -297,6 +312,7 @@ SrsConsumer::~SrsConsumer() source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); + st_cond_destroy(mw_wait); } void SrsConsumer::set_queue_size(double queue_size) @@ -329,6 +345,12 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S return ret; } + // fire the mw when msgs is enough. + if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) { + st_cond_signal(mw_wait); + mw_waiting = false; + } + return ret; } @@ -349,6 +371,22 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count) return queue->dump_packets(max_count, pmsgs, count); } +void SrsConsumer::wait(int nb_msgs, int duration) +{ + mw_min_msgs = nb_msgs; + mw_duration = duration; + + // already ok, donot wait. + if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) { + return; + } + + // the enqueue will notify this cond. + mw_waiting = true; + + st_cond_wait(mw_wait); +} + int SrsConsumer::on_play_client_pause(bool is_pause) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 2ac9b7f9e6..63795b66c0 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -115,6 +115,14 @@ class SrsMessageQueue SrsMessageQueue(); virtual ~SrsMessageQueue(); public: + /** + * get the count of queue. + */ + virtual int count(); + /** + * get duration of queue. + */ + virtual int duration(); /** * set the queue size * @param queue_size the queue size in seconds. @@ -154,6 +162,12 @@ class SrsConsumer bool paused; // when source id changed, notice all consumers bool should_update_source_id; + // the cond wait for mw. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 + st_cond_t mw_wait; + bool mw_waiting; + int mw_min_msgs; + int mw_duration; public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -189,6 +203,12 @@ class SrsConsumer */ virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count); /** + * wait for messages incomming, atleast nb_msgs and in duration. + * @param nb_msgs the messages count to wait. + * @param duration the messgae duration to wait. + */ + virtual void wait(int nb_msgs, int duration); + /** * when client send the pause message. */ virtual int on_play_client_pause(bool is_pause); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 13a24da78b..7ac574fc2c 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 55 +#define VERSION_REVISION 56 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index e54f4ef0f4..97c5191ba6 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -93,6 +93,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * @remark, recomment to 156. */ #define SRS_PERF_MW_MSGS 156 +/** +* how many msgs atleast to send. +* @remark, recomment to 8. +*/ +#define SRS_PERF_MW_MIN_MSGS 8 /** * how many chunk stream to cache, [0, N].