From c70421e6560ecd057bd53ebda52cd781ef680624 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 16 Feb 2018 16:39:07 +0800 Subject: [PATCH] Fix #464, support origin cluster --- trunk/conf/full.conf | 1 + trunk/conf/origin.cluster.edge.conf | 15 ++++++ trunk/conf/origin.cluster.serverA.conf | 5 +- trunk/conf/origin.cluster.serverB.conf | 5 +- trunk/src/app/srs_app_config.cpp | 9 +--- trunk/src/app/srs_app_coworkers.cpp | 8 ++- trunk/src/app/srs_app_http_hooks.cpp | 62 ++++++++++++++++++++++- trunk/src/app/srs_app_http_hooks.hpp | 4 ++ trunk/src/app/srs_app_rtmp_conn.cpp | 30 +++++++++++ trunk/src/app/srs_app_source.cpp | 5 ++ trunk/src/app/srs_app_source.hpp | 23 ++++----- trunk/src/kernel/srs_kernel_error.hpp | 2 + trunk/src/service/srs_service_utility.cpp | 41 ++++++++++----- 13 files changed, 171 insertions(+), 39 deletions(-) create mode 100644 trunk/conf/origin.cluster.edge.conf diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index cd80f41e07..5d1925fd87 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -366,6 +366,7 @@ vhost cluster.srs.com { debug_srs_upnode on; # For origin(mode local) cluster, turn on the cluster. + # @remark Origin cluster only supports RTMP, use Edge to transmux RTMP to FLV. # default: off # TODO: FIXME: Support reload. origin_cluster off; diff --git a/trunk/conf/origin.cluster.edge.conf b/trunk/conf/origin.cluster.edge.conf new file mode 100644 index 0000000000..1123fa905c --- /dev/null +++ b/trunk/conf/origin.cluster.edge.conf @@ -0,0 +1,15 @@ +# the config for srs origin-edge cluster +# @see https://github.com/ossrs/srs/wiki/v3_EN_OriginCluster +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +pid objs/edge.pid; +daemon off; +srs_log_tank console; +vhost __defaultVhost__ { + cluster { + mode remote; + origin 127.0.0.1:19351; + } +} diff --git a/trunk/conf/origin.cluster.serverA.conf b/trunk/conf/origin.cluster.serverA.conf index 6013cfc6f5..0e0d5701ec 100644 --- a/trunk/conf/origin.cluster.serverA.conf +++ b/trunk/conf/origin.cluster.serverA.conf @@ -13,7 +13,8 @@ http_api { } vhost __defaultVhost__ { cluster { - mode local; - coworkers 127.0.0.1:9091; + mode local; + origin_cluster on; + coworkers 127.0.0.1:9091; } } diff --git a/trunk/conf/origin.cluster.serverB.conf b/trunk/conf/origin.cluster.serverB.conf index 1423fd9ceb..333d1e5668 100644 --- a/trunk/conf/origin.cluster.serverB.conf +++ b/trunk/conf/origin.cluster.serverB.conf @@ -13,7 +13,8 @@ http_api { } vhost __defaultVhost__ { cluster { - mode local; - coworkers 127.0.0.1:9090; + mode local; + origin_cluster on; + coworkers 127.0.0.1:9090; } } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 1f2e2468a2..566c508a4b 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -5195,13 +5195,8 @@ vector SrsConfig::get_vhost_coworkers(string vhost) } conf = conf->get("coworkers"); - for (int i = 0; i < (int)conf->directives.size(); i++) { - SrsConfDirective* option = conf->directives[i]; - if (!option) { - continue; - } - - coworkers.push_back(option->arg0()); + for (int i = 0; i < (int)conf->args.size(); i++) { + coworkers.push_back(conf->args.at(i)); } return coworkers; diff --git a/trunk/src/app/srs_app_coworkers.cpp b/trunk/src/app/srs_app_coworkers.cpp index cec75c8e05..55eac270da 100644 --- a/trunk/src/app/srs_app_coworkers.cpp +++ b/trunk/src/app/srs_app_coworkers.cpp @@ -73,9 +73,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream) string service_ip = srs_get_public_internet_address(); string service_hostport = service_ports.at(0); - string service_host; int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT; - srs_parse_hostport(service_hostport, service_host, service_port); + if (service_hostport.find(":") != string::npos) { + string service_host; + srs_parse_hostport(service_hostport, service_host, service_port); + } else { + service_port = ::atoi(service_hostport.c_str()); + } string backend = _srs_config->get_http_api_listen(); if (backend.find(":") == string::npos) { diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 9777a7c50a..3241cde59f 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -407,6 +407,61 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re return srs_success; } +srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& port) +{ + srs_error_t err = srs_success; + + std::string res; + int status_code; + + SrsHttpClient http; + if ((err = do_post(&http, url, "", status_code, res)) != srs_success) { + return srs_error_wrap(err, "http: post %s, status=%d, res=%s", url.c_str(), status_code, res.c_str()); + } + + SrsJsonObject* robj = NULL; + SrsAutoFree(SrsJsonObject, robj); + + if (true) { + SrsJsonAny* jr = NULL; + if ((jr = SrsJsonAny::loads(res)) == NULL) { + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "load json from %s", res.c_str()); + } + + if (!jr->is_object()) { + srs_freep(jr); + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "response %s", res.c_str()); + } + + robj = jr->to_object(); + } + + SrsJsonAny* prop = NULL; + if ((prop = robj->ensure_property_object("data")) == NULL) { + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str()); + } + + SrsJsonObject* p = prop->to_object(); + if ((prop = p->ensure_property_object("origin")) == NULL) { + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str()); + } + p = prop->to_object(); + + if ((prop = p->ensure_property_string("ip")) == NULL) { + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str()); + } + host = prop->to_str(); + + if ((prop = p->ensure_property_integer("port")) == NULL) { + return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str()); + } + port = (int)prop->to_integer(); + + srs_trace("http: on_hls ok, url=%s, response=%s", url.c_str(), res.c_str()); + + return err; +} + srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res) { srs_error_t err = srs_success; @@ -420,8 +475,13 @@ srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::strin return srs_error_wrap(err, "http: init client"); } + string path = uri.get_path(); + if (!uri.get_query().empty()) { + path += "?" + uri.get_query(); + } + ISrsHttpMessage* msg = NULL; - if ((err = hc->post(uri.get_path(), req, &msg)) != srs_success) { + if ((err = hc->post(path, req, &msg)) != srs_success) { return srs_error_wrap(err, "http: client post"); } SrsAutoFree(ISrsHttpMessage, msg); diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index c9fb3188c2..1c83dfc2c9 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -113,6 +113,10 @@ class SrsHttpHooks * @param cid the source connection cid, for the on_dvr is async call. */ static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); + /** + * Discover co-workers for origin cluster. + */ + static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); private: static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 17601fd2dc..86d4289d38 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -227,6 +227,11 @@ srs_error_t SrsRtmpConn::do_cycle() srs_freep(r0); } + // If client is redirect to other servers, we already logged the event. + if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) { + srs_error_reset(err); + } + return err; } @@ -624,6 +629,31 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) } } + // When origin cluster enabled, try to redirect to the origin which is active. + // A active origin is a server which is delivering stream. + if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) { + vector coworkers = _srs_config->get_vhost_coworkers(req->vhost); + for (int i = 0; i < (int)coworkers.size(); i++) { + int port; + string host; + string url = "http://" + coworkers.at(i) + "/api/v1/clusters?" + + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream; + if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) { + return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str()); + } + srs_trace("rtmp: redirect in cluster, url=%s, target=%s:%d", url.c_str(), host.c_str(), port); + + bool accepted = false; + if ((err = rtmp->redirect(req, host, port, accepted)) != srs_success) { + srs_error_reset(err); + } else { + return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected"); + } + } + + return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin"); + } + // Set the socket options for transport. set_sock_options(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 283bfe5029..3219c4fdb2 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -2004,6 +2004,11 @@ int SrsSource::pre_source_id() return _pre_source_id; } +bool SrsSource::inactive() +{ + return _can_publish; +} + bool SrsSource::can_publish(bool is_edge) { if (is_edge) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 0a090d68e1..9fd8dcea83 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -606,11 +606,9 @@ class SrsSource : public ISrsReloadHandler // The metadata cache. SrsMetaCache* meta; private: - /** - * can publish, true when is not streaming - */ + // Whether source is avaiable for publishing. bool _can_publish; - // last die time, when all consumers quit and no publisher, + // The last die time, when all consumers quit and no publisher, // we will remove the source when source die. int64_t die_at; public: @@ -621,23 +619,24 @@ class SrsSource : public ISrsReloadHandler virtual srs_error_t cycle(); // remove source when expired. virtual bool expired(); - // initialize, get and setter. +// initialize, get and setter. public: - /** - * initialize the hls with handlers. - */ + // initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); // interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); - // for the tools callback +// for the tools callback public: // source id changed. virtual srs_error_t on_source_id_changed(int id); // get current source id. virtual int source_id(); virtual int pre_source_id(); - // logic data methods + // Whether source is inactive, which means there is no publishing stream source. + // @remark For edge, it's inactive util stream has been pulled from origin. + virtual bool inactive(); +// logic data methods public: virtual bool can_publish(bool is_edge); virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); @@ -658,7 +657,7 @@ class SrsSource : public ISrsReloadHandler */ virtual srs_error_t on_publish(); virtual void on_unpublish(); - // consumer methods +// consumer methods public: /** * create consumer and dumps packets in cache. @@ -671,7 +670,7 @@ class SrsSource : public ISrsReloadHandler virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); virtual SrsRtmpJitterAlgorithm jitter(); - // internal +// internal public: // for edge, when publish edge stream, check the state virtual srs_error_t on_edge_start_publish(); diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 200f8330dd..a80dd82720 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -273,6 +273,8 @@ #define ERROR_DASH_WRITE_FAILED 3087 #define ERROR_TS_CONTEXT_NOT_READY 3088 #define ERROR_MP4_ILLEGAL_MOOF 3089 +#define ERROR_OCLUSTER_DISCOVER 3090 +#define ERROR_OCLUSTER_REDIRECT 3091 /////////////////////////////////////////////////////// // HTTP/StreamCaster/KAFKA protocol error. diff --git a/trunk/src/service/srs_service_utility.cpp b/trunk/src/service/srs_service_utility.cpp index 4cbb5a6a7a..63694483ee 100644 --- a/trunk/src/service/srs_service_utility.cpp +++ b/trunk/src/service/srs_service_utility.cpp @@ -175,16 +175,18 @@ void retrieve_local_ips() } // If empty, disover IPv4 loopback. - for (ifaddrs* p = ifap; p ; p = p->ifa_next) { - ifaddrs* cur = p; - - // retrieve IP address, ignore the tun0 network device, whose addr is NULL. - // @see: https://github.com/ossrs/srs/issues/141 - bool ipv4 = (cur->ifa_addr->sa_family == AF_INET); - bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING); - bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC); - if (ipv4 && ready && !ignored) { - discover_network_iface(cur, ips, ss0, ss1, false); + if (ips.empty()) { + for (ifaddrs* p = ifap; p ; p = p->ifa_next) { + ifaddrs* cur = p; + + // retrieve IP address, ignore the tun0 network device, whose addr is NULL. + // @see: https://github.com/ossrs/srs/issues/141 + bool ipv4 = (cur->ifa_addr->sa_family == AF_INET); + bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING); + bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC); + if (ipv4 && ready && !ignored) { + discover_network_iface(cur, ips, ss0, ss1, false); + } } } @@ -216,6 +218,11 @@ string srs_get_public_internet_address() // find the best match public address. for (int i = 0; i < (int)ips.size(); i++) { std::string ip = ips[i]; + // TODO: FIXME: Support ipv6. + if (ip.find(".") == string::npos) { + continue; + } + in_addr_t addr = inet_addr(ip.c_str()); uint32_t addr_h = ntohl(addr); // lo, 127.0.0.0-127.0.0.1 @@ -247,6 +254,11 @@ string srs_get_public_internet_address() // no public address, use private address. for (int i = 0; i < (int)ips.size(); i++) { std::string ip = ips[i]; + // TODO: FIXME: Support ipv6. + if (ip.find(".") == string::npos) { + continue; + } + in_addr_t addr = inet_addr(ip.c_str()); uint32_t addr_h = ntohl(addr); // lo, 127.0.0.0-127.0.0.1 @@ -261,9 +273,12 @@ string srs_get_public_internet_address() } // Finally, use first whatever kind of address. - if (!ips.empty()) { - _public_internet_address = ips.at(0); - return _public_internet_address; + if (!ips.empty() && _public_internet_address.empty()) { + string ip = ips.at(0); + srs_warn("use first address as ip: %s", ip.c_str()); + + _public_internet_address = ip; + return ip; } return "";