Skip to content

Commit

Permalink
For #913, use complex error for listener
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jul 29, 2017
1 parent b88265f commit 5c9a12e
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 267 deletions.
28 changes: 7 additions & 21 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,48 +60,34 @@ SrsAppCasterFlv::~SrsAppCasterFlv()
srs_freep(manager);
}

int SrsAppCasterFlv::initialize()
srs_error_t SrsAppCasterFlv::initialize()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

if ((err = http_mux->handle("/", this)) != srs_success) {
// TODO: FIXME: Use error.
ret = srs_error_code(err);
srs_freep(err);

return ret;
return srs_error_wrap(err, "handle root");
}

if ((err = manager->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

return ret;
return srs_error_wrap(err, "start manager");
}

return ret;
return err;
}

int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip);
conns.push_back(conn);

if ((err = conn->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

return ret;
return srs_error_wrap(err, "start tcp listener");
}

return ret;
return err;
}

void SrsAppCasterFlv::remove(ISrsConnection* c)
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_caster_flv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler
SrsAppCasterFlv(SrsConfDirective* c);
virtual ~SrsAppCasterFlv();
public:
virtual int initialize();
virtual srs_error_t initialize();
// ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);
Expand Down
16 changes: 14 additions & 2 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,15 @@ int SrsEdgeIngester::ingest()
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS);

while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}

pprint->elapse();

// pithy print
Expand Down Expand Up @@ -555,7 +563,11 @@ srs_error_t SrsEdgeForwarder::do_cycle()

SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);

while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "edge forward pull");
}

if (send_error_code != ERROR_SUCCESS) {
srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;
Expand Down
10 changes: 9 additions & 1 deletion trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,15 @@ int SrsForwarder::forward()
}
}

while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}

pprint->elapse();

// read from client.
Expand Down
10 changes: 9 additions & 1 deletion trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,15 @@ int SrsHttpApi::do_cycle()
}

// process http messages.
while(!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}

ISrsHttpMessage* req = NULL;

// get a http message
Expand Down
10 changes: 9 additions & 1 deletion trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@ int SrsHttpConn::do_cycle()
}

// process http messages.
while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}

ISrsHttpMessage* req = NULL;

// get a http message
Expand Down
99 changes: 34 additions & 65 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ ISrsUdpHandler::~ISrsUdpHandler()
{
}

int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
return srs_success;
}

ISrsTcpHandler::ISrsTcpHandler()
Expand Down Expand Up @@ -107,17 +107,13 @@ srs_netfd_t SrsUdpListener::stfd()
return _stfd;
}

int SrsUdpListener::listen()
srs_error_t SrsUdpListener::listen()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

if ((_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_verbose("create linux socket success. ip=%s, port=%d, fd=%d", ip.c_str(), port, _fd);

srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
Expand All @@ -127,40 +123,31 @@ int SrsUdpListener::listen()
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

srs_freep(trd);
trd = new SrsSTCoroutine("udp", this);
if ((err = trd->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_wrap(err, "start thread");
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);

return ret;
return err;
}

srs_error_t SrsUdpListener::cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "udp listener");
}

// TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from;
int nb_from = sizeof(sockaddr_in);
Expand All @@ -170,8 +157,8 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}

if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) {
return srs_error_new(ret, "handle packet %d bytes", nread);
if ((err = handler->on_udp_packet(&from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}

if (SrsUdpPacketRecvCycleMS > 0) {
Expand Down Expand Up @@ -206,17 +193,13 @@ int SrsTcpListener::fd()
return _fd;
}

int SrsTcpListener::listen()
srs_error_t SrsTcpListener::listen()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);

srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
Expand All @@ -226,59 +209,45 @@ int SrsTcpListener::listen()
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
ret = ERROR_SOCKET_LISTEN;
srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this);
if ((err = trd->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);

srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_wrap(err, "start coroutine");
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);

return ret;
return err;
}

srs_error_t SrsTcpListener::cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;

while (!trd->pull()) {
srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
int fd = srs_netfd_fileno(stfd);

srs_fd_close_exec(fd);
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}

if(stfd == NULL){
return err;
srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(cstfd == NULL){
return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
}
srs_verbose("get a client. fd=%d", fd);

if ((ret = handler->on_tcp_client(stfd)) != ERROR_SUCCESS) {
return srs_error_new(ret, "handle fd=%d", fd);
int cfd = srs_netfd_fileno(cstfd);
srs_fd_close_exec(cfd);

if ((err = handler->on_tcp_client(cstfd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", cfd);
}
}

Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ISrsUdpHandler
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
virtual int on_stfd_change(srs_netfd_t fd);
virtual srs_error_t on_stfd_change(srs_netfd_t fd);
public:
/**
* when udp listener got a udp packet, notice server to process it.
Expand All @@ -57,7 +57,7 @@ class ISrsUdpHandler
* @param nb_buf, the size of udp packet bytes.
* @remark user should never use the buf, for it's a shared memory bytes.
*/
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0;
virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0;
};

/**
Expand All @@ -72,7 +72,7 @@ class ISrsTcpHandler
/**
* when got tcp client.
*/
virtual int on_tcp_client(srs_netfd_t stfd) = 0;
virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0;
};

/**
Expand All @@ -98,7 +98,7 @@ class SrsUdpListener : public ISrsCoroutineHandler
virtual int fd();
virtual srs_netfd_t stfd();
public:
virtual int listen();
virtual srs_error_t listen();
// interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
Expand All @@ -123,7 +123,7 @@ class SrsTcpListener : public ISrsCoroutineHandler
public:
virtual int fd();
public:
virtual int listen();
virtual srs_error_t listen();
// interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
Expand Down
Loading

0 comments on commit 5c9a12e

Please sign in to comment.