Skip to content

Commit

Permalink
For #906, #902, replace the endless thread with coroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 29, 2017
1 parent fc380fe commit 44f542f
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 151 deletions.
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
req = r->copy();
source = s;
queue = new SrsMessageQueue(true);
pthread = new SrsEndlessThread("http-stream", this);
trd = new SrsCoroutine("http-stream", this);

// TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
}

SrsBufferCache::~SrsBufferCache()
{
srs_freep(pthread);
srs_freep(trd);

srs_freep(queue);
srs_freep(req);
Expand All @@ -87,7 +87,7 @@ int SrsBufferCache::update(SrsSource* s, SrsRequest* r)

int SrsBufferCache::start()
{
return pthread->start();
return trd->start();
}

int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
Expand Down Expand Up @@ -138,7 +138,7 @@ int SrsBufferCache::cycle()
// TODO: FIXME: support reload.
queue->set_queue_size(fast_cache);

while (true) {
while (!trd->pull()) {
pprint->elapse();

// get messages from consumer.
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ class SrsTsTransmuxer;
* for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue.
*/
class SrsBufferCache : public ISrsEndlessThreadHandler
class SrsBufferCache : public ISrsCoroutineHandler
{
private:
double fast_cache;
private:
SrsMessageQueue* queue;
SrsSource* source;
SrsRequest* req;
SrsEndlessThread* pthread;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsSource* s, SrsRequest* r);
virtual ~SrsBufferCache();
Expand Down
26 changes: 14 additions & 12 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,13 @@ SrsUdpCasterListener::~SrsUdpCasterListener()

SrsSignalManager* SrsSignalManager::instance = NULL;

SrsSignalManager::SrsSignalManager(SrsServer* server)
SrsSignalManager::SrsSignalManager(SrsServer* s)
{
SrsSignalManager::instance = this;

_server = server;
server = s;
sig_pipe[0] = sig_pipe[1] = -1;
pthread = new SrsEndlessThread("signal", this);
trd = new SrsCoroutine("signal", this);
signal_read_stfd = NULL;
}

Expand All @@ -376,7 +376,7 @@ SrsSignalManager::~SrsSignalManager()
::close(sig_pipe[1]);
}

srs_freep(pthread);
srs_freep(trd);
}

int SrsSignalManager::initialize()
Expand Down Expand Up @@ -432,20 +432,22 @@ int SrsSignalManager::start()
srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d",
SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT);

return pthread->start();
return trd->start();
}

int SrsSignalManager::cycle()
{
int ret = ERROR_SUCCESS;

int signo;

/* Read the next signal from the pipe */
st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);

/* Process signal synchronously */
_server->on_signal(signo);
while (!trd->pull()) {
int signo;

/* Read the next signal from the pipe */
st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);

/* Process signal synchronously */
server->on_signal(signo);
}

return ret;
}
Expand Down
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ class SrsUdpCasterListener : public SrsUdpStreamListener
* convert signal to io,
* @see: st-1.9/docs/notes.html
*/
class SrsSignalManager : public ISrsEndlessThreadHandler
class SrsSignalManager : public ISrsCoroutineHandler
{
private:
/* Per-process pipe which is used as a signal queue. */
/* Up to PIPE_BUF/sizeof(int) signals can be queued up. */
int sig_pipe[2];
st_netfd_t signal_read_stfd;
private:
SrsServer* _server;
SrsEndlessThread* pthread;
SrsServer* server;
SrsCoroutine* trd;
public:
SrsSignalManager(SrsServer* server);
SrsSignalManager(SrsServer* s);
virtual ~SrsSignalManager();
public:
virtual int initialize();
Expand Down
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_st.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void SrsCoroutine::stop()

void* res = NULL;
int ret = st_thread_join(trd, &res);
srs_trace("Thread.stop: Terminated, ret=%d, err=%d", ret, err);
srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err);
srs_assert(!ret);

// Always override the error by the worker.
Expand All @@ -107,7 +107,7 @@ void SrsCoroutine::interrupt()
}
interrupted = true;

srs_trace("Thread.interrupt: Interrupt thread, err=%d", err);
srs_info("Thread.interrupt: Interrupt thread, err=%d", err);
err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err);
st_thread_interrupt(trd);
}
Expand All @@ -127,10 +127,10 @@ int SrsCoroutine::cycle()
if (!context && _srs_context) {
context = _srs_context->generate_id();
}
srs_trace("Thread.cycle: Start with cid=%d, err=%d", context, err);
srs_info("Thread.cycle: Start with cid=%d, err=%d", context, err);

int ret = handler->cycle();
srs_trace("Thread.cycle: Finished with ret=%d, err=%d", ret, err);
srs_info("Thread.cycle: Finished with ret=%d, err=%d", ret, err);
return ret;
}

Expand Down
68 changes: 0 additions & 68 deletions trunk/src/app/srs_app_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,74 +26,6 @@
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>

ISrsEndlessThreadHandler::ISrsEndlessThreadHandler()
{
}

ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler()
{
}

void ISrsEndlessThreadHandler::on_thread_start()
{
}

int ISrsEndlessThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}

int ISrsEndlessThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}

void ISrsEndlessThreadHandler::on_thread_stop()
{
}

SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h)
{
handler = h;
pthread = new internal::SrsThread(n, this, 0, false);
}

SrsEndlessThread::~SrsEndlessThread()
{
pthread->stop();
srs_freep(pthread);
}

int SrsEndlessThread::start()
{
return pthread->start();
}

int SrsEndlessThread::cycle()
{
return handler->cycle();
}

void SrsEndlessThread::on_thread_start()
{
handler->on_thread_start();
}

int SrsEndlessThread::on_before_cycle()
{
return handler->on_before_cycle();
}

int SrsEndlessThread::on_end_cycle()
{
return handler->on_end_cycle();
}

void SrsEndlessThread::on_thread_stop()
{
handler->on_thread_stop();
}

ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{
}
Expand Down
57 changes: 0 additions & 57 deletions trunk/src/app/srs_app_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,63 +28,6 @@

#include <srs_app_st.hpp>

/**
* the endless thread is a loop thread never quit.
* user can create thread always running util server terminate.
* the step to create a thread never stop:
* 1. create SrsEndlessThread field.
* for example:
* class SrsBufferCache : public ISrsEndlessThreadHandler {
* public: SrsBufferCache() { pthread = new SrsEndlessThread("http-stream", this); }
* public: virtual int cycle() {
* // do some work never end.
* }
* }
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
class ISrsEndlessThreadHandler
{
public:
ISrsEndlessThreadHandler();
virtual ~ISrsEndlessThreadHandler();
public:
/**
* the cycle method for the common thread.
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
class SrsEndlessThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsEndlessThreadHandler* handler;
public:
SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);
virtual ~SrsEndlessThread();
public:
/**
* for the endless thread, never quit.
*/
virtual int start();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};

/**
* the one cycle thread is a thread do the cycle only one time,
* that is, the thread will quit when return from the cycle.
Expand Down

0 comments on commit 44f542f

Please sign in to comment.