Skip to content

Commit

Permalink
STAT: Update stat for SRT connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 27, 2022
1 parent f5f89c6 commit 7577a2d
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 15 deletions.
3 changes: 1 addition & 2 deletions trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ class ISrsExpire
};

// Interface for connection that is startable.
class ISrsStartableConneciton : public ISrsConnection
, public ISrsStartable, public ISrsKbpsDelta
class ISrsStartableConneciton : public ISrsConnection, public ISrsStartable, public ISrsKbpsDelta
{
public:
ISrsStartableConneciton();
Expand Down
36 changes: 33 additions & 3 deletions trunk/src/app/srs_app_srt_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ using namespace std;
#include <srs_app_pithy_print.hpp>
#include <srs_app_srt_server.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_kernel_utility.hpp>

SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd)
{
Expand Down Expand Up @@ -167,6 +170,7 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s

srt_source_ = NULL;
req_ = new SrsRequest();
req_->ip = ip;
}

SrsMpegtsSrtConn::~SrsMpegtsSrtConn()
Expand All @@ -188,10 +192,14 @@ std::string SrsMpegtsSrtConn::desc()

void SrsMpegtsSrtConn::remark(int64_t* in, int64_t* out)
{
// TODO: FIXME: no impl currently.
kbps_->remark(in, out);
}

void SrsMpegtsSrtConn::expire()
{
trd_->interrupt();
}

srs_error_t SrsMpegtsSrtConn::start()
{
srs_error_t err = srs_success;
Expand All @@ -215,9 +223,12 @@ const SrsContextId& SrsMpegtsSrtConn::get_id()

srs_error_t SrsMpegtsSrtConn::cycle()
{
srs_error_t err = srs_success;
srs_error_t err = do_cycle();

err = do_cycle();
// Update statistic when done.
SrsStatistic* stat = SrsStatistic::instance();
stat->kbps_add_delta(get_id().c_str(), this);
stat->on_disconnect(get_id().c_str());

// Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
Expand Down Expand Up @@ -256,6 +267,12 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str());
}

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req_->vhost);
if (parsed_vhost) {
req_->vhost = parsed_vhost->arg0();
}

if (! _srs_config->get_srt_enabled(req_->vhost)) {
return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str());
}
Expand All @@ -271,6 +288,9 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
return srs_error_wrap(err, "on connect");
}

// Build the tcUrl which is vhost/app.
req_->tcUrl = srs_generate_tc_url(req_->host, req_->vhost, req_->app, req_->port);

if (mode == SrtModePush) {
err = publishing();
} else if (mode == SrtModePull) {
Expand Down Expand Up @@ -307,6 +327,11 @@ srs_error_t SrsMpegtsSrtConn::playing()
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}

SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}

err = do_playing();
http_hooks_on_stop();
Expand Down Expand Up @@ -363,6 +388,11 @@ srs_error_t SrsMpegtsSrtConn::do_publishing()
SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();
SrsAutoFree(SrsPithyPrint, pprint);

SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {
return srs_error_wrap(err, "srt: stat client");
}

int nb_packets = 0;

// Max udp packet size equal to 1500.
Expand Down
5 changes: 4 additions & 1 deletion trunk/src/app/srs_app_srt_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SrsSrtRecvThread : public ISrsCoroutineHandler
srs_error_t recv_err_;
};

class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler
class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler, public ISrsExpire
{
public:
SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port);
Expand All @@ -81,6 +81,9 @@ class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHan
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
// Interface ISrsExpire
public:
virtual void expire();
public:
virtual srs_error_t start();
// Interface ISrsConnection.
Expand Down
65 changes: 57 additions & 8 deletions trunk/src/app/srs_app_srt_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ using namespace std;
#include <srs_protocol_log.hpp>
#include <srs_app_config.hpp>
#include <srs_app_srt_conn.hpp>
#include <srs_app_statistic.hpp>

#ifdef SRS_SRT
SrsSrtEventLoop* _srt_eventloop = NULL;
Expand Down Expand Up @@ -131,16 +132,27 @@ srs_error_t SrsSrtAcceptor::on_srt_client(srs_srt_t srt_fd)
SrsSrtServer::SrsSrtServer()
{
conn_manager_ = new SrsResourceManager("SRT", true);
timer_ = NULL;
}

SrsSrtServer::~SrsSrtServer()
{
srs_freep(conn_manager_);
srs_freep(timer_);
}

srs_error_t SrsSrtServer::initialize()
{
srs_error_t err = srs_success;

if (! _srs_config->get_srt_enabled()) {
return err;
}

if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}

return err;
}

Expand Down Expand Up @@ -242,17 +254,54 @@ srs_error_t SrsSrtServer::fd_to_resource(srs_srt_t srt_fd, ISrsStartableConnecit

void SrsSrtServer::remove(ISrsResource* c)
{
// TODO: FIXME: add some statistic of srt.
// ISrsStartableConneciton* conn = dynamic_cast<ISrsStartableConneciton*>(c);

// SrsStatistic* stat = SrsStatistic::instance();
// stat->kbps_add_delta(c->get_id().c_str(), conn);
// stat->on_disconnect(c->get_id().c_str());

// use manager to free it async.
conn_manager_->remove(c);
}

srs_error_t SrsSrtServer::setup_ticks()
{
srs_error_t err = srs_success;

srs_freep(timer_);
timer_ = new SrsHourGlass("srt", this, 1 * SRS_UTIME_SECONDS);

if (_srs_config->get_stats_enabled()) {
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
}

if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}

return err;
}

srs_error_t SrsSrtServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;

switch (event) {
case 8: resample_kbps(); break;
}

return err;
}

void SrsSrtServer::resample_kbps()
{
// collect delta from all clients.
for (int i = 0; i < (int)conn_manager_->size(); i++) {
ISrsResource* c = conn_manager_->at(i);
ISrsKbpsDelta* conn = dynamic_cast<ISrsKbpsDelta*>(conn_manager_->at(i));

// add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat.
SrsStatistic::instance()->kbps_add_delta(c->get_id().c_str(), conn);
}
}

SrsSrtServerAdapter::SrsSrtServerAdapter()
{
srt_server_ = new SrsSrtServer();
Expand Down Expand Up @@ -349,7 +398,7 @@ srs_error_t SrsSrtEventLoop::start()
srs_error_t SrsSrtEventLoop::cycle()
{
srs_error_t err = srs_success;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "srt listener");
Expand Down
10 changes: 9 additions & 1 deletion trunk/src/app/srs_app_srt_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <srs_app_srt_listener.hpp>

class SrsSrtServer;
class SrsHourGlass;

// A common srt acceptor, for SRT server.
class SrsSrtAcceptor : public ISrsSrtHandler
Expand All @@ -37,10 +38,11 @@ class SrsSrtAcceptor : public ISrsSrtHandler
};

// SRS SRT server, initialize and listen, start connection service thread, destroy client.
class SrsSrtServer : public ISrsResourceManager
class SrsSrtServer : public ISrsResourceManager, public ISrsHourGlass
{
private:
SrsResourceManager* conn_manager_;
SrsHourGlass* timer_;
private:
std::vector<SrsSrtAcceptor*> acceptors_;
public:
Expand All @@ -66,6 +68,12 @@ class SrsSrtServer : public ISrsResourceManager
// A callback for connection to remove itself.
// When connection thread cycle terminated, callback this to delete connection.
virtual void remove(ISrsResource* c);
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
private:
virtual void resample_kbps();
};

// The srt server adapter, the master server.
Expand Down

0 comments on commit 7577a2d

Please sign in to comment.