Skip to content

Commit

Permalink
Fix #464, support origin cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Feb 16, 2018
1 parent 2f09ec4 commit c70421e
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 39 deletions.
1 change: 1 addition & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ vhost cluster.srs.com {
debug_srs_upnode on;

# For origin(mode local) cluster, turn on the cluster.
# @remark Origin cluster only supports RTMP, use Edge to transmux RTMP to FLV.
# default: off
# TODO: FIXME: Support reload.
origin_cluster off;
Expand Down
15 changes: 15 additions & 0 deletions trunk/conf/origin.cluster.edge.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# the config for srs origin-edge cluster
# @see https://github.com/ossrs/srs/wiki/v3_EN_OriginCluster
# @see full.conf for detail config.

listen 1935;
max_connections 1000;
pid objs/edge.pid;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
cluster {
mode remote;
origin 127.0.0.1:19351;
}
}
5 changes: 3 additions & 2 deletions trunk/conf/origin.cluster.serverA.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ http_api {
}
vhost __defaultVhost__ {
cluster {
mode local;
coworkers 127.0.0.1:9091;
mode local;
origin_cluster on;
coworkers 127.0.0.1:9091;
}
}
5 changes: 3 additions & 2 deletions trunk/conf/origin.cluster.serverB.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ http_api {
}
vhost __defaultVhost__ {
cluster {
mode local;
coworkers 127.0.0.1:9090;
mode local;
origin_cluster on;
coworkers 127.0.0.1:9090;
}
}
9 changes: 2 additions & 7 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5195,13 +5195,8 @@ vector<string> SrsConfig::get_vhost_coworkers(string vhost)
}

conf = conf->get("coworkers");
for (int i = 0; i < (int)conf->directives.size(); i++) {
SrsConfDirective* option = conf->directives[i];
if (!option) {
continue;
}

coworkers.push_back(option->arg0());
for (int i = 0; i < (int)conf->args.size(); i++) {
coworkers.push_back(conf->args.at(i));
}

return coworkers;
Expand Down
8 changes: 6 additions & 2 deletions trunk/src/app/srs_app_coworkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
string service_ip = srs_get_public_internet_address();
string service_hostport = service_ports.at(0);

string service_host;
int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(service_hostport, service_host, service_port);
if (service_hostport.find(":") != string::npos) {
string service_host;
srs_parse_hostport(service_hostport, service_host, service_port);
} else {
service_port = ::atoi(service_hostport.c_str());
}

string backend = _srs_config->get_http_api_listen();
if (backend.find(":") == string::npos) {
Expand Down
62 changes: 61 additions & 1 deletion trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,61 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re
return srs_success;
}

srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& port)
{
srs_error_t err = srs_success;

std::string res;
int status_code;

SrsHttpClient http;
if ((err = do_post(&http, url, "", status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: post %s, status=%d, res=%s", url.c_str(), status_code, res.c_str());
}

SrsJsonObject* robj = NULL;
SrsAutoFree(SrsJsonObject, robj);

if (true) {
SrsJsonAny* jr = NULL;
if ((jr = SrsJsonAny::loads(res)) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "load json from %s", res.c_str());
}

if (!jr->is_object()) {
srs_freep(jr);
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "response %s", res.c_str());
}

robj = jr->to_object();
}

SrsJsonAny* prop = NULL;
if ((prop = robj->ensure_property_object("data")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}

SrsJsonObject* p = prop->to_object();
if ((prop = p->ensure_property_object("origin")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
p = prop->to_object();

if ((prop = p->ensure_property_string("ip")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
host = prop->to_str();

if ((prop = p->ensure_property_integer("port")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
port = (int)prop->to_integer();

srs_trace("http: on_hls ok, url=%s, response=%s", url.c_str(), res.c_str());

return err;
}

srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
{
srs_error_t err = srs_success;
Expand All @@ -420,8 +475,13 @@ srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::strin
return srs_error_wrap(err, "http: init client");
}

string path = uri.get_path();
if (!uri.get_query().empty()) {
path += "?" + uri.get_query();
}

ISrsHttpMessage* msg = NULL;
if ((err = hc->post(uri.get_path(), req, &msg)) != srs_success) {
if ((err = hc->post(path, req, &msg)) != srs_success) {
return srs_error_wrap(err, "http: client post");
}
SrsAutoFree(ISrsHttpMessage, msg);
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_http_hooks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class SrsHttpHooks
* @param cid the source connection cid, for the on_dvr is async call.
*/
static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
/**
* Discover co-workers for origin cluster.
*/
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
};
Expand Down
30 changes: 30 additions & 0 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ srs_error_t SrsRtmpConn::do_cycle()
srs_freep(r0);
}

// If client is redirect to other servers, we already logged the event.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
}

return err;
}

Expand Down Expand Up @@ -624,6 +629,31 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
}
}

// When origin cluster enabled, try to redirect to the origin which is active.
// A active origin is a server which is delivering stream.
if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
for (int i = 0; i < (int)coworkers.size(); i++) {
int port;
string host;
string url = "http://" + coworkers.at(i) + "/api/v1/clusters?"
+ "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream;
if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
}
srs_trace("rtmp: redirect in cluster, url=%s, target=%s:%d", url.c_str(), host.c_str(), port);

bool accepted = false;
if ((err = rtmp->redirect(req, host, port, accepted)) != srs_success) {
srs_error_reset(err);
} else {
return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
}
}

return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
}

// Set the socket options for transport.
set_sock_options();

Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,11 @@ int SrsSource::pre_source_id()
return _pre_source_id;
}

bool SrsSource::inactive()
{
return _can_publish;
}

bool SrsSource::can_publish(bool is_edge)
{
if (is_edge) {
Expand Down
23 changes: 11 additions & 12 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,9 @@ class SrsSource : public ISrsReloadHandler
// The metadata cache.
SrsMetaCache* meta;
private:
/**
* can publish, true when is not streaming
*/
// Whether source is avaiable for publishing.
bool _can_publish;
// last die time, when all consumers quit and no publisher,
// The last die time, when all consumers quit and no publisher,
// we will remove the source when source die.
int64_t die_at;
public:
Expand All @@ -621,23 +619,24 @@ class SrsSource : public ISrsReloadHandler
virtual srs_error_t cycle();
// remove source when expired.
virtual bool expired();
// initialize, get and setter.
// initialize, get and setter.
public:
/**
* initialize the hls with handlers.
*/
// initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
// for the tools callback
// for the tools callback
public:
// source id changed.
virtual srs_error_t on_source_id_changed(int id);
// get current source id.
virtual int source_id();
virtual int pre_source_id();
// logic data methods
// Whether source is inactive, which means there is no publishing stream source.
// @remark For edge, it's inactive util stream has been pulled from origin.
virtual bool inactive();
// logic data methods
public:
virtual bool can_publish(bool is_edge);
virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
Expand All @@ -658,7 +657,7 @@ class SrsSource : public ISrsReloadHandler
*/
virtual srs_error_t on_publish();
virtual void on_unpublish();
// consumer methods
// consumer methods
public:
/**
* create consumer and dumps packets in cache.
Expand All @@ -671,7 +670,7 @@ class SrsSource : public ISrsReloadHandler
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
virtual SrsRtmpJitterAlgorithm jitter();
// internal
// internal
public:
// for edge, when publish edge stream, check the state
virtual srs_error_t on_edge_start_publish();
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@
#define ERROR_DASH_WRITE_FAILED 3087
#define ERROR_TS_CONTEXT_NOT_READY 3088
#define ERROR_MP4_ILLEGAL_MOOF 3089
#define ERROR_OCLUSTER_DISCOVER 3090
#define ERROR_OCLUSTER_REDIRECT 3091

///////////////////////////////////////////////////////
// HTTP/StreamCaster/KAFKA protocol error.
Expand Down
41 changes: 28 additions & 13 deletions trunk/src/service/srs_service_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,18 @@ void retrieve_local_ips()
}

// If empty, disover IPv4 loopback.
for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p;

// retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);
bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING);
bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC);
if (ipv4 && ready && !ignored) {
discover_network_iface(cur, ips, ss0, ss1, false);
if (ips.empty()) {
for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p;

// retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);
bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING);
bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC);
if (ipv4 && ready && !ignored) {
discover_network_iface(cur, ips, ss0, ss1, false);
}
}
}

Expand Down Expand Up @@ -216,6 +218,11 @@ string srs_get_public_internet_address()
// find the best match public address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
// TODO: FIXME: Support ipv6.
if (ip.find(".") == string::npos) {
continue;
}

in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
Expand Down Expand Up @@ -247,6 +254,11 @@ string srs_get_public_internet_address()
// no public address, use private address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
// TODO: FIXME: Support ipv6.
if (ip.find(".") == string::npos) {
continue;
}

in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
Expand All @@ -261,9 +273,12 @@ string srs_get_public_internet_address()
}

// Finally, use first whatever kind of address.
if (!ips.empty()) {
_public_internet_address = ips.at(0);
return _public_internet_address;
if (!ips.empty() && _public_internet_address.empty()) {
string ip = ips.at(0);
srs_warn("use first address as ip: %s", ip.c_str());

_public_internet_address = ip;
return ip;
}

return "";
Expand Down

0 comments on commit c70421e

Please sign in to comment.