Skip to content

Commit

Permalink
For #906, #902, use coroutine for one cycle thread
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 29, 2017
1 parent b21f92f commit 2ed2513
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 212 deletions.
34 changes: 8 additions & 26 deletions trunk/src/app/srs_app_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,16 @@ using namespace std;

SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
{
id = 0;
manager = cm;
stfd = c;
ip = cip;
disposed = false;
expired = false;
create_time = srs_get_system_time_ms();

skt = new SrsStSocket();
kbps = new SrsKbps();
kbps->set_io(skt, skt);

// the client thread should reap itself,
// so we never use joinable.
// TODO: FIXME: maybe other thread need to stop it.
// @see: https://github.com/ossrs/srs/issues/78
pthread = new SrsOneCycleThread("conn", this);
trd = new SrsCoroutine("conn", this);
}

SrsConnection::~SrsConnection()
Expand All @@ -57,7 +50,9 @@ SrsConnection::~SrsConnection()

srs_freep(kbps);
srs_freep(skt);
srs_freep(pthread);
srs_freep(trd);

srs_close_stfd(stfd);
}

void SrsConnection::resample()
Expand All @@ -82,17 +77,7 @@ void SrsConnection::cleanup()

void SrsConnection::dispose()
{
if (disposed) {
return;
}

disposed = true;

/**
* when delete the connection, stop the connection,
* close the underlayer socket, delete the thread.
*/
srs_close_stfd(stfd);
trd->interrupt();
}

int SrsConnection::start()
Expand All @@ -103,16 +88,13 @@ int SrsConnection::start()
return ret;
}

return pthread->start();
return trd->start();
}

int SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;

_srs_context->generate_id();
id = _srs_context->get_id();

int oret = ret = do_cycle();

// if socket io error, set to closed.
Expand All @@ -138,12 +120,12 @@ int SrsConnection::cycle()

int SrsConnection::srs_id()
{
return id;
return trd->cid();
}

void SrsConnection::expire()
{
expired = true;
trd->interrupt();
}


21 changes: 3 additions & 18 deletions trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,15 @@
* all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove.
*/
class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycleThreadHandler
class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler
, virtual public IKbpsDelta, virtual public ISrsReloadHandler
{
private:
protected:
/**
* each connection start a green thread,
* when thread stop, the connection will be delete by server.
*/
SrsOneCycleThread* pthread;
/**
* the id of connection.
*/
int id;
protected:
SrsCoroutine* trd;
/**
* the manager object to manage the connection.
*/
Expand All @@ -65,16 +60,6 @@ class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycle
* the ip of client.
*/
std::string ip;
/**
* whether the connection is disposed,
* when disposed, connection should stop cycle and cleanup itself.
*/
bool disposed;
/**
* whether connection is expired, application definition.
* when expired, the connection must never be served and quit ASAP.
*/
bool expired;
/**
* the underlayer socket.
*/
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ int SrsHttpApi::do_cycle()
}

// process http messages.
while(!disposed) {
while(!trd->pull()) {
ISrsHttpMessage* req = NULL;

// get a http message
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ int SrsHttpConn::do_cycle()
}

// process http messages.
while (!disposed) {
while (!trd->pull()) {
ISrsHttpMessage* req = NULL;

// get a http message
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c)
{
conn = c;
error = ERROR_SUCCESS;
trd = new SrsOneCycleThread("http-receive", this);
trd = new SrsCoroutine("http-receive", this, _srs_context->get_id());
}

SrsHttpRecvThread::~SrsHttpRecvThread()
Expand All @@ -558,7 +558,7 @@ int SrsHttpRecvThread::cycle()
{
int ret = ERROR_SUCCESS;

while (true) {
while (!trd->pull()) {
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_recv_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public IS
* when client closed the request, to avoid FD leak.
* @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
*/
class SrsHttpRecvThread : public ISrsOneCycleThreadHandler
class SrsHttpRecvThread : public ISrsCoroutineHandler
{
private:
SrsResponseOnlyHttpConn* conn;
SrsOneCycleThread* trd;
SrsCoroutine* trd;
int error;
public:
SrsHttpRecvThread(SrsResponseOnlyHttpConn* c);
Expand Down
46 changes: 23 additions & 23 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ int SrsRtmpConn::service_cycle()
}
srs_verbose("on_bw_done success");

while (!disposed) {
while (!trd->pull()) {
ret = stream_service_cycle();

// stream service must terminated with error, never success.
Expand Down Expand Up @@ -692,7 +692,7 @@ int SrsRtmpConn::playing(SrsSource* source)
return ret;
}

int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd)
int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -730,12 +730,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);

while (!disposed) {
while (true) {
// collect elapse for pithy print.
pprint->elapse();

// when source is set to expired, disconnect it.
if (expired) {
if (trd->pull()) {
ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret);
return ret;
Expand All @@ -744,8 +744,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// to use isolate thread to recv, can improve about 33% performance.
// @see: https://github.com/ossrs/srs/issues/196
// @see: https://github.com/ossrs/srs/issues/217
while (!trd->empty()) {
SrsCommonMessage* msg = trd->pump();
while (!rtrd->empty()) {
SrsCommonMessage* msg = rtrd->pump();
srs_verbose("pump client message to process.");

if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
Expand All @@ -757,7 +757,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
}

// quit when recv thread error.
if ((ret = trd->error_code()) != ERROR_SUCCESS) {
if ((ret = rtrd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_error("recv thread failed. ret=%d", ret);
}
Expand Down Expand Up @@ -893,13 +893,13 @@ int SrsRtmpConn::publishing(SrsSource* source)
if ((ret = acquire_publish(source)) == ERROR_SUCCESS) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);
SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);

srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &trd);
ret = do_publishing(source, &rtrd);

// stop isolate recv thread
trd.stop();
rtrd.stop();
}

// whatever the acquire publish, always release publish.
Expand All @@ -916,7 +916,7 @@ int SrsRtmpConn::publishing(SrsSource* source)
return ret;
}

int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
{
int ret = ERROR_SUCCESS;

Expand All @@ -925,15 +925,15 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
SrsAutoFree(SrsPithyPrint, pprint);

// start isolate recv thread.
if ((ret = trd->start()) != ERROR_SUCCESS) {
if ((ret = rtrd->start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret);
return ret;
}

// change the isolate recv thread context id,
// merge its log to current thread.
int receive_thread_cid = trd->get_cid();
trd->set_cid(_srs_context->get_id());
int receive_thread_cid = rtrd->get_cid();
rtrd->set_cid(_srs_context->get_id());

// initialize the publish timeout.
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
Expand All @@ -951,11 +951,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)

int64_t nb_msgs = 0;
uint64_t nb_frames = 0;
while (!disposed) {
while (true) {
pprint->elapse();

// when source is set to expired, disconnect it.
if (expired) {
if (trd->pull()) {
ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret);
return ret;
Expand All @@ -965,35 +965,35 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout.
// @see https://github.com/ossrs/srs/issues/441
trd->wait(publish_1stpkt_timeout);
rtrd->wait(publish_1stpkt_timeout);
} else {
trd->wait(publish_normal_timeout);
rtrd->wait(publish_normal_timeout);
}

// check the thread error code.
if ((ret = trd->error_code()) != ERROR_SUCCESS) {
if ((ret = rtrd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
srs_error("recv thread failed. ret=%d", ret);
}
return ret;
}

// when not got any messages, timeout.
if (trd->nb_msgs() <= nb_msgs) {
if (rtrd->nb_msgs() <= nb_msgs) {
ret = ERROR_SOCKET_TIMEOUT;
srs_warn("publish timeout %dms, nb_msgs=%" PRId64 ", ret=%d",
nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
break;
}
nb_msgs = trd->nb_msgs();
nb_msgs = rtrd->nb_msgs();

// Update the stat for video fps.
// @remark https://github.com/ossrs/srs/issues/851
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) {
if ((ret = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) {
return ret;
}
nb_frames = trd->nb_video_frames();
nb_frames = rtrd->nb_video_frames();

// reportable
if (pprint->can_print()) {
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
stfd = fd;
skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt);
trd = new SrsOneCycleThread("rtsp", this);
trd = new SrsCoroutine("rtsp", this);

req = NULL;
sdk = NULL;
Expand Down Expand Up @@ -249,7 +249,7 @@ int SrsRtspConn::do_cycle()
srs_trace("rtsp: serve %s", ip.c_str());

// consume all rtsp messages.
for (;;) {
while (!trd->pull()) {
SrsRtspRequest* req = NULL;
if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtsp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SrsRtspJitter
/**
* the rtsp connection serve the fd.
*/
class SrsRtspConn : public ISrsOneCycleThreadHandler
class SrsRtspConn : public ISrsCoroutineHandler
{
private:
std::string output_template;
Expand All @@ -133,7 +133,7 @@ class SrsRtspConn : public ISrsOneCycleThreadHandler
SrsStSocket* skt;
SrsRtspStack* rtsp;
SrsRtspCaster* caster;
SrsOneCycleThread* trd;
SrsCoroutine* trd;
private:
SrsRequest* req;
SrsSimpleRtmpClient* sdk;
Expand Down
Loading

0 comments on commit 2ed2513

Please sign in to comment.