diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 80807cde103..9158dab1d85 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -136,8 +136,7 @@ class ISrsExpire }; // Interface for connection that is startable. -class ISrsStartableConneciton : public ISrsConnection - , public ISrsStartable, public ISrsKbpsDelta +class ISrsStartableConneciton : public ISrsConnection, public ISrsStartable, public ISrsKbpsDelta { public: ISrsStartableConneciton(); diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index fb307a395b6..f2183a35f82 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -19,6 +19,9 @@ using namespace std; #include #include #include +#include +#include +#include SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd) { @@ -167,6 +170,7 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s srt_source_ = NULL; req_ = new SrsRequest(); + req_->ip = ip; } SrsMpegtsSrtConn::~SrsMpegtsSrtConn() @@ -188,10 +192,14 @@ std::string SrsMpegtsSrtConn::desc() void SrsMpegtsSrtConn::remark(int64_t* in, int64_t* out) { - // TODO: FIXME: no impl currently. kbps_->remark(in, out); } +void SrsMpegtsSrtConn::expire() +{ + trd_->interrupt(); +} + srs_error_t SrsMpegtsSrtConn::start() { srs_error_t err = srs_success; @@ -215,9 +223,12 @@ const SrsContextId& SrsMpegtsSrtConn::get_id() srs_error_t SrsMpegtsSrtConn::cycle() { - srs_error_t err = srs_success; + srs_error_t err = do_cycle(); - err = do_cycle(); + // Update statistic when done. + SrsStatistic* stat = SrsStatistic::instance(); + stat->kbps_add_delta(get_id().c_str(), this); + stat->on_disconnect(get_id().c_str()); // Notify manager to remove it. // Note that we create this object, so we use manager to remove it. @@ -256,6 +267,12 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str()); } + // discovery vhost, resolve the vhost from config + SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req_->vhost); + if (parsed_vhost) { + req_->vhost = parsed_vhost->arg0(); + } + if (! _srs_config->get_srt_enabled(req_->vhost)) { return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str()); } @@ -271,6 +288,9 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() return srs_error_wrap(err, "on connect"); } + // Build the tcUrl which is vhost/app. + req_->tcUrl = srs_generate_tc_url(req_->host, req_->vhost, req_->app, req_->port); + if (mode == SrtModePush) { err = publishing(); } else if (mode == SrtModePull) { @@ -307,6 +327,11 @@ srs_error_t SrsMpegtsSrtConn::playing() if ((err = http_hooks_on_play()) != srs_success) { return srs_error_wrap(err, "rtmp: callback on play"); } + + SrsStatistic* stat = SrsStatistic::instance(); + if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) { + return srs_error_wrap(err, "rtmp: stat client"); + } err = do_playing(); http_hooks_on_stop(); @@ -363,6 +388,11 @@ srs_error_t SrsMpegtsSrtConn::do_publishing() SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish(); SrsAutoFree(SrsPithyPrint, pprint); + SrsStatistic* stat = SrsStatistic::instance(); + if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) { + return srs_error_wrap(err, "srt: stat client"); + } + int nb_packets = 0; // Max udp packet size equal to 1500. diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 543e07b655e..fe6acf8f03d 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -70,7 +70,7 @@ class SrsSrtRecvThread : public ISrsCoroutineHandler srs_error_t recv_err_; }; -class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler +class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler, public ISrsExpire { public: SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port); @@ -81,6 +81,9 @@ class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHan // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); +// Interface ISrsExpire +public: + virtual void expire(); public: virtual srs_error_t start(); // Interface ISrsConnection. diff --git a/trunk/src/app/srs_app_srt_server.cpp b/trunk/src/app/srs_app_srt_server.cpp index 2eec7ebf085..d5af6e1e2d3 100644 --- a/trunk/src/app/srs_app_srt_server.cpp +++ b/trunk/src/app/srs_app_srt_server.cpp @@ -13,6 +13,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_SRT SrsSrtEventLoop* _srt_eventloop = NULL; @@ -131,16 +132,27 @@ srs_error_t SrsSrtAcceptor::on_srt_client(srs_srt_t srt_fd) SrsSrtServer::SrsSrtServer() { conn_manager_ = new SrsResourceManager("SRT", true); + timer_ = NULL; } SrsSrtServer::~SrsSrtServer() { srs_freep(conn_manager_); + srs_freep(timer_); } srs_error_t SrsSrtServer::initialize() { srs_error_t err = srs_success; + + if (! _srs_config->get_srt_enabled()) { + return err; + } + + if ((err = setup_ticks()) != srs_success) { + return srs_error_wrap(err, "tick"); + } + return err; } @@ -242,17 +254,54 @@ srs_error_t SrsSrtServer::fd_to_resource(srs_srt_t srt_fd, ISrsStartableConnecit void SrsSrtServer::remove(ISrsResource* c) { - // TODO: FIXME: add some statistic of srt. - // ISrsStartableConneciton* conn = dynamic_cast(c); - - // SrsStatistic* stat = SrsStatistic::instance(); - // stat->kbps_add_delta(c->get_id().c_str(), conn); - // stat->on_disconnect(c->get_id().c_str()); - // use manager to free it async. conn_manager_->remove(c); } +srs_error_t SrsSrtServer::setup_ticks() +{ + srs_error_t err = srs_success; + + srs_freep(timer_); + timer_ = new SrsHourGlass("srt", this, 1 * SRS_UTIME_SECONDS); + + if (_srs_config->get_stats_enabled()) { + if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + + return err; +} + +srs_error_t SrsSrtServer::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + switch (event) { + case 8: resample_kbps(); break; + } + + return err; +} + +void SrsSrtServer::resample_kbps() +{ + // collect delta from all clients. + for (int i = 0; i < (int)conn_manager_->size(); i++) { + ISrsResource* c = conn_manager_->at(i); + ISrsKbpsDelta* conn = dynamic_cast(conn_manager_->at(i)); + + // add delta of connection to server kbps., + // for next sample() of server kbps can get the stat. + SrsStatistic::instance()->kbps_add_delta(c->get_id().c_str(), conn); + } +} + SrsSrtServerAdapter::SrsSrtServerAdapter() { srt_server_ = new SrsSrtServer(); @@ -349,7 +398,7 @@ srs_error_t SrsSrtEventLoop::start() srs_error_t SrsSrtEventLoop::cycle() { srs_error_t err = srs_success; - + while (true) { if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "srt listener"); diff --git a/trunk/src/app/srs_app_srt_server.hpp b/trunk/src/app/srs_app_srt_server.hpp index 6e6e18904cc..8ae6f7a044a 100644 --- a/trunk/src/app/srs_app_srt_server.hpp +++ b/trunk/src/app/srs_app_srt_server.hpp @@ -14,6 +14,7 @@ #include class SrsSrtServer; +class SrsHourGlass; // A common srt acceptor, for SRT server. class SrsSrtAcceptor : public ISrsSrtHandler @@ -37,10 +38,11 @@ class SrsSrtAcceptor : public ISrsSrtHandler }; // SRS SRT server, initialize and listen, start connection service thread, destroy client. -class SrsSrtServer : public ISrsResourceManager +class SrsSrtServer : public ISrsResourceManager, public ISrsHourGlass { private: SrsResourceManager* conn_manager_; + SrsHourGlass* timer_; private: std::vector acceptors_; public: @@ -66,6 +68,12 @@ class SrsSrtServer : public ISrsResourceManager // A callback for connection to remove itself. // When connection thread cycle terminated, callback this to delete connection. virtual void remove(ISrsResource* c); +// interface ISrsHourGlass +private: + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); +private: + virtual void resample_kbps(); }; // The srt server adapter, the master server.