diff --git a/README.md b/README.md index 482c8508f2..e70995fc86 100755 --- a/README.md +++ b/README.md @@ -183,6 +183,7 @@ Please select your language: ### V3 changes +* v3.0, 2017-01-17, for [#742][bug #742] refine source, timeout, live cycle. 3.0.15 * v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14 * v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13 * v3.0, 2017-01-06, for [#711][bug #711] support perfile for transcode. 3.0.12 @@ -1365,7 +1366,8 @@ Winlin [bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx [bug #735]: https://github.com/ossrs/srs/issues/735 -[bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx +[bug #742]: https://github.com/ossrs/srs/issues/742 +[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx [exo #828]: https://github.com/google/ExoPlayer/pull/828 diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 9f855ade6d..9de9f09087 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -120,7 +120,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip) : SrsHttpConn(cm, fd, m, cip) { - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; pprint = SrsPithyPrint::create_caster(); } @@ -181,9 +181,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) { int ret = ERROR_SUCCESS; + srs_freep(sdk); + int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; - if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(output, cto / 1000, sto / 1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); return ret; } diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index fc117fe5a4..ba36ab827d 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -73,14 +73,12 @@ SrsEdgeUpstream::~SrsEdgeUpstream() SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r) { redirect = r; - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; } SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream() { close(); - - srs_freep(sdk); } int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) @@ -126,9 +124,12 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); } + srs_freep(sdk); int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } @@ -153,7 +154,7 @@ int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppack void SrsEdgeRtmpUpstream::close() { - sdk->close(); + srs_freep(sdk); } void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout) @@ -406,7 +407,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() req = NULL; send_error_code = ERROR_SUCCESS; - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; lb = new SrsLbRoundRobin(); pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); @@ -416,7 +417,6 @@ SrsEdgeForwarder::~SrsEdgeForwarder() { stop(); - srs_freep(sdk); srs_freep(lb); srs_freep(pthread); srs_freep(queue); @@ -464,9 +464,12 @@ int SrsEdgeForwarder::start() } // open socket. + srs_freep(sdk); int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } @@ -482,8 +485,9 @@ int SrsEdgeForwarder::start() void SrsEdgeForwarder::stop() { pthread->stop(); - sdk->close(); queue->clear(); + + srs_freep(sdk); } #define SYS_MAX_EDGE_SEND_MSGS 128 diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 1c442073ce..65b449f45f 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -57,7 +57,7 @@ SrsForwarder::SrsForwarder(SrsSource* s) req = NULL; sh_video = sh_audio = NULL; - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); @@ -236,9 +236,12 @@ int SrsForwarder::cycle() url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); } + srs_freep(sdk); int64_t cto = SRS_FORWARDER_SLEEP_US; int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(url, cto, sto); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 7b1d08485d..5f64089615 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -40,7 +40,7 @@ using namespace std; SrsHttpClient::SrsHttpClient() { - transport = new SrsTcpClient(); + transport = NULL; kbps = new SrsKbps(); parser = NULL; timeout_us = 0; @@ -52,19 +52,14 @@ SrsHttpClient::~SrsHttpClient() disconnect(); srs_freep(kbps); - srs_freep(transport); srs_freep(parser); } +// TODO: FIXME: use ms for timeout. int SrsHttpClient::initialize(string h, int p, int64_t t_us) { int ret = ERROR_SUCCESS; - // disconnect first when h:p changed. - if ((!host.empty() && host != h) || (port != 0 && port != p)) { - disconnect(); - } - srs_freep(parser); parser = new SrsHttpParser(); @@ -73,9 +68,11 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us) return ret; } + // Always disconnect the transport. host = h; port = p; timeout_us = t_us; + disconnect(); // ep used for host in header. string ep = host; @@ -83,7 +80,7 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us) ep += ":" + srs_int2str(port); } - // set default value for headers. + // Set default value for headers. headers["Host"] = ep; headers["Connection"] = "Keep-Alive"; headers["User-Agent"] = RTMP_SIG_SRS_SERVER; @@ -126,9 +123,8 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) std::string data = ss.str(); if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { - // disconnect when error. + // Disconnect the transport when channel error, reconnect for next operation. disconnect(); - srs_error("write http post failed. ret=%d", ret); return ret; } @@ -138,9 +134,13 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) srs_error("parse http post response failed. ret=%d", ret); return ret; } - srs_assert(msg); - *ppmsg = msg; + + if (ppmsg) { + *ppmsg = msg; + } else { + srs_freep(msg); + } srs_info("parse http post response success."); return ret; @@ -173,9 +173,8 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) std::string data = ss.str(); if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { - // disconnect when error. + // Disconnect the transport when channel error, reconnect for next operation. disconnect(); - srs_error("write http get failed. ret=%d", ret); return ret; } @@ -187,7 +186,11 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) } srs_assert(msg); - *ppmsg = msg; + if (ppmsg) { + *ppmsg = msg; + } else { + srs_freep(msg); + } srs_info("parse http get response success."); return ret; @@ -215,23 +218,24 @@ void SrsHttpClient::kbps_sample(const char* label, int64_t age) void SrsHttpClient::disconnect() { kbps->set_io(NULL, NULL); + transport->close(); + srs_freep(transport); } int SrsHttpClient::connect() { int ret = ERROR_SUCCESS; - if (transport->connected()) { + // When transport connected, ignore. + if (transport) { return ret; } - disconnect(); - - // open socket. - if ((ret = transport->connect(host, port, timeout_us)) != ERROR_SUCCESS) { - srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", - host.c_str(), port, timeout_us, ret); + transport = new SrsTcpClient(host, port, timeout_us / 1000); + if ((ret = transport->connect()) != ERROR_SUCCESS) { + disconnect(); + srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret); return ret; } srs_info("connect to server success. server=%s, port=%d", host.c_str(), port); diff --git a/trunk/src/app/srs_app_http_client.hpp b/trunk/src/app/srs_app_http_client.hpp index cfd8df62d9..494d93255f 100644 --- a/trunk/src/app/srs_app_http_client.hpp +++ b/trunk/src/app/srs_app_http_client.hpp @@ -46,11 +46,19 @@ class SrsKbps; #define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL) /** -* http client to GET/POST/PUT/DELETE uri -*/ + * The client to GET/POST/PUT/DELETE over HTTP. + * @remark We will reuse the TCP transport until initialize or channel error, + * such as send/recv failed. + * Usage: + * SrsHttpClient hc; + * hc.initialize("127.0.0.1", 80, 9000); + * hc.post("/api/v1/version", "Hello world!", NULL); + */ class SrsHttpClient { private: + // The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected. + // We will disconnect transport when initialize or channel error, such as send/recv error. SrsTcpClient* transport; SrsHttpParser* parser; std::map headers; @@ -65,12 +73,13 @@ class SrsHttpClient virtual ~SrsHttpClient(); public: /** - * initialize the client, connect to host and port. + * Initliaze the client, disconnect the transport, renew the HTTP parser. * @remark we will set default values in headers, which can be override by set_header. */ virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US); /** - * set the header[k]=v and return the client itself. + * Set HTTP request header in header[k]=v. + * @return the HTTP client itself. */ virtual SrsHttpClient* set_header(std::string k, std::string v); public: diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 7c3fe30977..d5e3038360 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -135,14 +135,13 @@ SrsKafkaPartition::SrsKafkaPartition() id = broker = 0; port = SRS_CONSTS_KAFKA_DEFAULT_PORT; - transport = new SrsTcpClient(); - kafka = new SrsKafkaClient(transport); + transport = NULL; + kafka = NULL; } SrsKafkaPartition::~SrsKafkaPartition() { - srs_freep(kafka); - srs_freep(transport); + disconnect(); } string SrsKafkaPartition::hostport() @@ -158,13 +157,15 @@ int SrsKafkaPartition::connect() { int ret = ERROR_SUCCESS; - if (transport->connected()) { + if (transport) { return ret; } + transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT); + kafka = new SrsKafkaClient(transport); - int64_t timeout = SRS_KAFKA_PRODUCER_TIMEOUT * 1000; - if ((ret = transport->connect(host, port, timeout)) != ERROR_SUCCESS) { - srs_error("connect to %s partition=%d failed, timeout=%"PRId64". ret=%d", hostport().c_str(), id, timeout, ret); + if ((ret = transport->connect()) != ERROR_SUCCESS) { + disconnect(); + srs_error("connect to %s partition=%d failed. ret=%d", hostport().c_str(), id, ret); return ret; } @@ -178,6 +179,12 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) return kafka->write_messages(topic, id, *pc); } +void SrsKafkaPartition::disconnect() +{ + srs_freep(kafka); + srs_freep(transport); +} + SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j) { producer = p; @@ -562,12 +569,6 @@ int SrsKafkaProducer::request_metadata() return ret; } - SrsTcpClient* transport = new SrsTcpClient(); - SrsAutoFree(SrsTcpClient, transport); - - SrsKafkaClient* kafka = new SrsKafkaClient(transport); - SrsAutoFree(SrsKafkaClient, kafka); - std::string server; int port = SRS_CONSTS_KAFKA_DEFAULT_PORT; if (true) { @@ -584,8 +585,14 @@ int SrsKafkaProducer::request_metadata() senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); } + SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US / 1000); + SrsAutoFree(SrsTcpClient, transport); + + SrsKafkaClient* kafka = new SrsKafkaClient(transport); + SrsAutoFree(SrsKafkaClient, kafka); + // reconnect to kafka server. - if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { + if ((ret = transport->connect()) != ERROR_SUCCESS) { srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); return ret; } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index decbb09e25..d2992bfc6f 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -57,6 +57,7 @@ struct SrsKafkaPartition { private: std::string ep; + // Not NULL when connected. SrsTcpClient* transport; SrsKafkaClient* kafka; public: @@ -73,6 +74,8 @@ struct SrsKafkaPartition virtual std::string hostport(); virtual int connect(); virtual int flush(SrsKafkaPartitionCache* pc); +private: + virtual void disconnect(); }; /** diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 6c528616df..1b3f84582a 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -133,8 +133,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) buffer = new SrsSimpleStream(); output = _srs_config->get_stream_caster_output(c); - req = NULL; - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; avc = new SrsRawH264Stream(); aac = new SrsRawAacStream(); @@ -149,7 +148,6 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() { close(); - srs_freep(sdk); srs_freep(buffer); srs_freep(stream); srs_freep(context); @@ -570,6 +568,10 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da { int ret = ERROR_SUCCESS; + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + SrsSharedPtrMessage* msg = NULL; if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { @@ -597,6 +599,7 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da // send out encoded msg. if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { + close(); return ret; } } @@ -608,20 +611,23 @@ int SrsMpegtsOverUdp::connect() { int ret = ERROR_SUCCESS; - // when ok, ignore. - // TODO: FIXME: should reconnect when disconnected. - if (sdk->connected()) { + // Ignore when connected. + if (sdk) { return ret; } int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; - if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(output, cto/1000, sto/1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { + close(); srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); return ret; } if ((ret = sdk->publish()) != ERROR_SUCCESS) { + close(); srs_error("mpegts: publish failed. ret=%d", ret); return ret; } @@ -631,8 +637,7 @@ int SrsMpegtsOverUdp::connect() void SrsMpegtsOverUdp::close() { - srs_freep(req); - sdk->close(); + srs_freep(sdk); } #endif diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index de479985d5..d2de345b0f 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -86,7 +86,6 @@ class SrsMpegtsOverUdp : virtual public ISrsTsHandler SrsSimpleStream* buffer; std::string output; private: - SrsRequest* req; SrsSimpleRtmpClient* sdk; private: SrsRawH264Stream* avc; @@ -121,10 +120,9 @@ class SrsMpegtsOverUdp : virtual public ISrsTsHandler private: virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); private: - // connect to rtmp output url. - // @remark ignore when not connected, reconnect when disconnected. + // Connect to RTMP server. virtual int connect(); - // close the connected io and rtmp to ready to be re-connect. + // Close the connection to RTMP server. virtual void close(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 0c6e6d70d2..10da46f864 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -77,51 +77,45 @@ using namespace std; // when edge timeout, retry next. #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) -SrsSimpleRtmpClient::SrsSimpleRtmpClient() +SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm) { - req = NULL; - client = NULL; kbps = new SrsKbps(); - transport = new SrsTcpClient(); + url = u; + connect_timeout = ctm; + stream_timeout = stm; + + req = new SrsRequest(); + srs_parse_rtmp_url(url, req->tcUrl, req->stream); + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); + + transport = NULL; + client = NULL; + stream_id = 0; } SrsSimpleRtmpClient::~SrsSimpleRtmpClient() { close(); - srs_freep(kbps); - srs_freep(transport); - srs_freep(client); } -int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t stream_timeout) +int SrsSimpleRtmpClient::connect() { int ret = ERROR_SUCCESS; - // when ok, ignore. - // TODO: FIXME: should reconnect when disconnected. - if (transport->connected()) { - return ret; - } + close(); - // parse uri - srs_freep(req); - req = new SrsRequest(); - srs_parse_rtmp_url(url, req->tcUrl, req->stream); - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); + transport = new SrsTcpClient(req->host, req->port, connect_timeout); + client = new SrsRtmpClient(transport); + kbps->set_io(transport, transport); - // connect host. - if ((ret = transport->connect(req->host, req->port, connect_timeout)) != ERROR_SUCCESS) { + if ((ret = transport->connect()) != ERROR_SUCCESS) { + close(); return ret; } - srs_freep(client); - client = new SrsRtmpClient(transport); - - kbps->set_io(transport, transport); - client->set_recv_timeout(stream_timeout); client->set_send_timeout(stream_timeout); @@ -142,6 +136,13 @@ int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t st return ret; } +void SrsSimpleRtmpClient::close() +{ + srs_freep(client); + srs_freep(transport); + kbps->set_io(NULL, NULL); +} + int SrsSimpleRtmpClient::connect_app() { int ret = ERROR_SUCCESS; @@ -197,19 +198,6 @@ int SrsSimpleRtmpClient::connect_app() return ret; } -bool SrsSimpleRtmpClient::connected() -{ - return transport->connected(); -} - -void SrsSimpleRtmpClient::close() -{ - transport->close(); - - srs_freep(client); - srs_freep(req); -} - int SrsSimpleRtmpClient::publish() { int ret = ERROR_SUCCESS; @@ -1464,48 +1452,35 @@ int SrsRtmpConn::check_edge_token_traverse_auth() srs_assert(req); - SrsTcpClient* transport = new SrsTcpClient(); - SrsAutoFree(SrsTcpClient, transport); - vector args = _srs_config->get_vhost_edge_origin(req->vhost)->args; - for (int i = 0; i < (int)args.size(); i++) { - string hostport = args.at(i); - if ((ret = connect_server(hostport, transport)) == ERROR_SUCCESS) { - break; - } - } - if (ret != ERROR_SUCCESS) { - srs_warn("token traverse connect failed. ret=%d", ret); + if (args.empty()) { return ret; } - SrsRtmpClient* client = new SrsRtmpClient(transport); - SrsAutoFree(SrsRtmpClient, client); - - return do_token_traverse_auth(client); -} - -int SrsRtmpConn::connect_server(string hostport, SrsTcpClient* transport) -{ - int ret = ERROR_SUCCESS; - - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); - srs_assert(conf); - - // select the origin. - string server; - int port = SRS_CONSTS_RTMP_DEFAULT_PORT; - srs_parse_hostport(hostport, server, port); + for (int i = 0; i < (int)args.size(); i++) { + string hostport = args.at(i); + + // select the origin. + string server; + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; + srs_parse_hostport(hostport, server, port); - // open socket. - int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US; - if ((ret = transport->connect(server, port, timeout)) != ERROR_SUCCESS) { - srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", - req->tcUrl.c_str(), server.c_str(), port, timeout, ret); - return ret; + SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US / 1000); + SrsAutoFree(SrsTcpClient, transport); + + if ((ret = transport->connect()) != ERROR_SUCCESS) { + srs_warn("Illegal edge token, tcUrl=%s to server=%s, port=%d. ret=%d", req->tcUrl.c_str(), server.c_str(), port, ret); + continue; + } + + SrsRtmpClient* client = new SrsRtmpClient(transport); + SrsAutoFree(SrsRtmpClient, client); + + return do_token_traverse_auth(client); } - srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); + ret = ERROR_EDGE_PORT_INVALID; + srs_error("Illegal edge token, server=%d. ret=%d", (int)args.size(), ret); return ret; } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 6f503ea4c4..f7af1036d1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -63,10 +63,20 @@ class ISrsKafkaCluster; #endif /** - * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. + * The simple RTMP client, provides friendly APIs. + * @remark Should never use client when closed. + * Usage: + * SrsSimpleRtmpClient client("rtmp://127.0.0.1:1935/live/livestream", 3000, 9000); + * client.connect(); + * client.play(); + * client.close(); */ class SrsSimpleRtmpClient { +private: + std::string url; + int64_t connect_timeout; + int64_t stream_timeout; private: SrsRequest* req; SrsTcpClient* transport; @@ -74,15 +84,19 @@ class SrsSimpleRtmpClient SrsKbps* kbps; int stream_id; public: - SrsSimpleRtmpClient(); + // Constructor. + // @param u The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost + // @param ctm The timeout in ms to connect to server. + // @param stm The timeout in ms to delivery A/V stream. + SrsSimpleRtmpClient(std::string u, int64_t ctm, int64_t stm); virtual ~SrsSimpleRtmpClient(); public: - virtual int connect(std::string url, int64_t connect_timeout, int64_t stream_timeout); + // Connect, handshake and connect app to RTMP server. + // @remark We always close the transport. + virtual int connect(); + virtual void close(); private: virtual int connect_app(); -public: - virtual bool connected(); - virtual void close(); public: virtual int publish(); virtual int play(); @@ -175,8 +189,8 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl virtual void set_sock_options(); private: virtual int check_edge_token_traverse_auth(); - virtual int connect_server(std::string hostport, SrsTcpClient* transport); virtual int do_token_traverse_auth(SrsRtmpClient* client); +private: /** * when the connection disconnect, call this method. * e.g. log msg of connection and report to other system. diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index c7ef8ed418..731d688f62 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -197,7 +197,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) trd = new SrsOneCycleThread("rtsp", this); req = NULL; - sdk = new SrsSimpleRtmpClient(); + sdk = NULL; vjitter = new SrsRtspJitter(); ajitter = new SrsRtspJitter(); @@ -209,6 +209,8 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) SrsRtspConn::~SrsRtspConn() { + close(); + srs_close_stfd(stfd); srs_freep(video_rtp); @@ -623,6 +625,10 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i { int ret = ERROR_SUCCESS; + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + SrsSharedPtrMessage* msg = NULL; if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { @@ -633,20 +639,19 @@ int SrsRtspConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, i // send out encoded msg. if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { + close(); return ret; } return ret; } -// TODO: FIXME: merge all client code. int SrsRtspConn::connect() { int ret = ERROR_SUCCESS; - // when ok, ignore. - // TODO: FIXME: support reconnect. - if (sdk->connected()) { + // Ignore when connected. + if (sdk) { return ret; } @@ -666,13 +671,17 @@ int SrsRtspConn::connect() // connect host. int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { + close(); srs_error("rtsp: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } // publish. if ((ret = sdk->publish()) != ERROR_SUCCESS) { + close(); srs_error("rtsp: publish %s failed. ret=%d", url.c_str(), ret); return ret; } @@ -680,6 +689,11 @@ int SrsRtspConn::connect() return write_sequence_header(); } +void SrsRtspConn::close() +{ + srs_freep(sdk); +} + SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) { // TODO: FIXME: support reload. diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 441992cad9..51fd156cba 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -177,9 +177,10 @@ class SrsRtspConn : public ISrsOneCycleThreadHandler virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts); virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); private: - // connect to rtmp output url. - // @remark ignore when not connected, reconnect when disconnected. + // Connect to RTMP server. virtual int connect(); + // Close the connection to RTMP server. + virtual void close(); }; /** diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index c6364cc466..a3f093510d 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -421,10 +421,14 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) return ret; } -SrsTcpClient::SrsTcpClient() +SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm) { io = NULL; stfd = NULL; + + host = h; + port = p; + timeout = tm; } SrsTcpClient::~SrsTcpClient() @@ -432,26 +436,19 @@ SrsTcpClient::~SrsTcpClient() close(); } -bool SrsTcpClient::connected() -{ - return io; -} - -int SrsTcpClient::connect(string host, int port, int64_t timeout) +int SrsTcpClient::connect() { int ret = ERROR_SUCCESS; - // when connected, ignore. - if (io) { - return ret; - } + close(); - // connect host. - if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) { - srs_error("connect server %s:%d failed. ret=%d", host.c_str(), port, ret); + srs_assert(stfd == NULL); + if ((ret = srs_socket_connect(host, port, timeout * 1000, &stfd)) != ERROR_SUCCESS) { + srs_error("connect tcp://%s:%d failed, to=%"PRId64"ms. ret=%d", host.c_str(), port, timeout, ret); return ret; } + srs_assert(io == NULL); io = new SrsStSocket(stfd); return ret; @@ -459,7 +456,7 @@ int SrsTcpClient::connect(string host, int port, int64_t timeout) void SrsTcpClient::close() { - // when closed, ignore. + // Ignore when already closed. if (!io) { return; } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index a2d4b4f3bd..53ebeaa2a4 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -205,34 +205,41 @@ class SrsStSocket : public ISrsProtocolReaderWriter }; /** - * the common tcp client, to connect to specified TCP server, - * reconnect and close the connection. + * The client to connect to server over TCP. + * User must never reuse the client when close it. + * Usage: + * SrsTcpClient client("127.0.0.1", 1935,9000); + * client.connect(); + * client.write("Hello world!", 12, NULL); + * client.read(buf, 4096, NULL); */ class SrsTcpClient : public ISrsProtocolReaderWriter { private: st_netfd_t stfd; SrsStSocket* io; -public: - SrsTcpClient(); - virtual ~SrsTcpClient(); +private: + std::string host; + int port; + int64_t timeout; public: /** - * whether connected to server. + * Constructor. + * @param h the ip or hostname of server. + * @param p the port to connect to. + * @param tm the timeout in ms. */ - virtual bool connected(); + SrsTcpClient(std::string h, int p, int64_t tm); + virtual ~SrsTcpClient(); public: /** - * connect to server over TCP. - * @param host the ip or hostname of server. - * @param port the port to connect to. - * @param timeout the timeout in us. - * @remark ignore when connected. + * Connect to server over TCP. + * @remark We will close the exists connection before do connect. */ - virtual int connect(std::string host, int port, int64_t timeout); + virtual int connect(); /** - * close the connection. - * @remark ignore when closed. + * Close the connection to server. + * @remark User should never use the client when close it. */ virtual void close(); // interface ISrsProtocolReaderWriter diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 3e61be5287..968516b650 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 3 #define VERSION_MINOR 0 -#define VERSION_REVISION 14 +#define VERSION_REVISION 15 // generated by configure, only macros. #include diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index b0a0e3c6c9..b2ceb5b5c8 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -69,6 +69,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // to avoid death connection. // the common io timeout, for both recv and send. +// TODO: FIXME: use ms for timeout. #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL) // the timeout to wait for client control message, @@ -409,6 +410,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092 // the common io timeout, for both recv and send. +// TODO: FIXME: use ms for timeout. #define SRS_CONSTS_KAFKA_TIMEOUT_US (int64_t)(30*1000*1000LL) #endif diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index 822429bd25..78d7ef8e10 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -647,34 +647,8 @@ class SrsIngestSrsOutput : virtual public ISrsTsHandler, virtual public ISrsAacH SrsRawAacStream* aac; std::string aac_specific_config; public: - SrsIngestSrsOutput(SrsHttpUri* rtmp) { - out_rtmp = rtmp; - disconnected = false; - raw_aac_dts = srs_update_system_time_ms(); - - req = NULL; - sdk = new SrsSimpleRtmpClient(); - - avc = new SrsRawH264Stream(); - aac = new SrsRawAacStream(); - h264_sps_changed = false; - h264_pps_changed = false; - h264_sps_pps_sent = false; - } - virtual ~SrsIngestSrsOutput() { - close(); - - srs_freep(sdk); - srs_freep(avc); - srs_freep(aac); - - std::multimap::iterator it; - for (it = queue.begin(); it != queue.end(); ++it) { - SrsTsMessage* msg = it->second; - srs_freep(msg); - } - queue.clear(); - } + SrsIngestSrsOutput(SrsHttpUri* rtmp); + virtual ~SrsIngestSrsOutput(); // interface ISrsTsHandler public: virtual int on_ts_message(SrsTsMessage* msg); @@ -705,6 +679,37 @@ class SrsIngestSrsOutput : virtual public ISrsTsHandler, virtual public ISrsAacH virtual void close(); }; +SrsIngestSrsOutput::SrsIngestSrsOutput(SrsHttpUri* rtmp) +{ + out_rtmp = rtmp; + disconnected = false; + raw_aac_dts = srs_update_system_time_ms(); + + req = NULL; + sdk = NULL; + + avc = new SrsRawH264Stream(); + aac = new SrsRawAacStream(); + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = false; +} + +SrsIngestSrsOutput::~SrsIngestSrsOutput() +{ + close(); + + srs_freep(avc); + srs_freep(aac); + + std::multimap::iterator it; + for (it = queue.begin(); it != queue.end(); ++it) { + SrsTsMessage* msg = it->second; + srs_freep(msg); + } + queue.clear(); +} + int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) { int ret = ERROR_SUCCESS; @@ -1184,6 +1189,10 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* { int ret = ERROR_SUCCESS; + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + SrsSharedPtrMessage* msg = NULL; if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) { @@ -1196,6 +1205,7 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* // send out encoded msg. if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) { + close(); srs_error("send RTMP type=%d, dts=%d, size=%d failed. ret=%d", type, timestamp, size, ret); return ret; } @@ -1207,9 +1217,8 @@ int SrsIngestSrsOutput::connect() { int ret = ERROR_SUCCESS; - // when ok, ignore. - // TODO: FIXME: should reconnect when disconnected. - if (sdk->connected()) { + // Ignore when connected. + if (sdk) { return ret; } @@ -1219,13 +1228,17 @@ int SrsIngestSrsOutput::connect() // connect host. int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; - if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000); + + if ((ret = sdk->connect()) != ERROR_SUCCESS) { + close(); srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } // publish. if ((ret = sdk->publish()) != ERROR_SUCCESS) { + close(); srs_error("mpegts: publish %s failed. ret=%d", url.c_str(), ret); return ret; } @@ -1235,11 +1248,9 @@ int SrsIngestSrsOutput::connect() void SrsIngestSrsOutput::close() { - srs_trace("close output=%s", out_rtmp->get_url().c_str()); h264_sps_pps_sent = false; - srs_freep(req); - sdk->close(); + srs_freep(sdk); } // the context for ingest hls stream.