Skip to content

Commit

Permalink
curvefs: The curvefs cluster has some internal service which support …
Browse files Browse the repository at this point in the history
…cluster communicate itself and some external services support for client and management tools.

If all the services start on the same ip+port will affect the communication within the cluster when the heavy stress from client.
Start the services needed by outsied on another ip+port as a external server.
  • Loading branch information
SeanHai committed Jan 13, 2022
1 parent 895fa84 commit 9e79c77
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 42 deletions.
2 changes: 2 additions & 0 deletions curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ s3compactwq.s3infocache_size=100
global.ip=127.0.0.1 # __CURVEADM_TEMPLATE__ ${service_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ curvefs_metaserver_listen_host }} __ANSIBLE_TEMPLATE__
global.port=16701 # __CURVEADM_TEMPLATE__ ${service_port} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ curvefs_metaserver_listen_port }} __ANSIBLE_TEMPLATE__
global.external_ip=127.0.0.1 # __CURVEADM_TEMPLATE__ ${service_external_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ curvefs_metaserver_listen_host }} __ANSIBLE_TEMPLATE__
global.external_port=16701 # __CURVEADM_TEMPLATE__ ${service_external_port} __CURVEADM_TEMPLATE__
global.enable_external_server=false

# metaserver log directory
# this config item can be replaced by start up option `-log_dir`
Expand Down
2 changes: 1 addition & 1 deletion curvefs/conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rpcTimeoutMs=10000
rpcRetryTimes=5
# topo file path
topoFilePath=curvefs/test/tools/topo_example.json # __CURVEADM_TEMPLATE__ /curvefs/tools/conf/topology.json __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ project_root_dest }}/conf/topology.json __ANSIBLE_TEMPLATE__
# metaserver
# metaserver external address
metaserverAddr=127.0.0.1:6701 # __CURVEADM_TEMPLATE__ ${cluster_metaserver_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ groups.metaserver | join_peer(hostvars, "metaserver_listen_port") }} __ANSIBLE_TEMPLATE__
# etcd
etcdAddr=127.0.0.1:12379 # __CURVEADM_TEMPLATE__ ${cluster_etcd_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ groups.etcd | join_peer(hostvars, "etcd_listen_client_port") }} __ANSIBLE_TEMPLATE__
Expand Down
1 change: 1 addition & 0 deletions curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ message MetaServerLocation {
required string hostIp = 2;
required uint32 port = 3;
optional string externalIp = 4;
optional uint32 externalPort = 5;
}

message CopySetServerInfo {
Expand Down
38 changes: 21 additions & 17 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,25 @@ MdsClientImpl::CommitTx(const std::vector<PartitionTxId> &txIds) {
return static_cast<TopoStatusCode>(rc);
}

template<typename T>
void MdsClientImpl::GetEndPoint(const T &info, butil::EndPoint *internal,
butil::EndPoint *external) {
std::string internalIp = info.hostip();
std::string externalIp = internalIp;
if (info.has_externalip()) {
externalIp = info.externalip();
}

uint32_t internalPort = info.port();
uint32_t externalPort = internalPort;
if (info.has_externalport()) {
externalPort = info.externalport();
}

butil::str2endpoint(internalIp.c_str(), internalPort, internal);
butil::str2endpoint(externalIp.c_str(), externalPort, external);
}

bool MdsClientImpl::GetMetaServerInfo(
const PeerAddr &addr, CopysetPeerInfo<MetaserverID> *metaserverInfo) {
std::vector<std::string> strs;
Expand Down Expand Up @@ -283,16 +302,9 @@ bool MdsClientImpl::GetMetaServerInfo(
} else {
const auto &info = response.metaserverinfo();
MetaserverID metaserverID = info.metaserverid();
std::string internalIp = info.hostip();
std::string externalIp = internalIp;
if (info.has_externalip()) {
externalIp = info.externalip();
}
uint32_t port = info.port();
butil::EndPoint internal;
butil::str2endpoint(internalIp.c_str(), port, &internal);
butil::EndPoint external;
butil::str2endpoint(externalIp.c_str(), port, &external);
GetEndPoint(info, &internal, &external);
*metaserverInfo = CopysetPeerInfo<MetaserverID>(
metaserverID, PeerAddr(internal), PeerAddr(external));
}
Expand Down Expand Up @@ -329,18 +341,10 @@ bool MdsClientImpl::GetMetaServerListInCopysets(
CopysetPeerInfo<MetaserverID> csinfo;
::curvefs::mds::topology::MetaServerLocation csl =
info.cslocs(j);
uint16_t port = csl.port();
std::string internalIp = csl.hostip();
csinfo.peerID = csl.metaserverid();
std::string externalIp = internalIp;
if (csl.has_externalip()) {
externalIp = csl.externalip();
}

butil::EndPoint internal;
butil::str2endpoint(internalIp.c_str(), port, &internal);
butil::EndPoint external;
butil::str2endpoint(externalIp.c_str(), port, &external);
GetEndPoint(csl, &internal, &external);
csinfo.internalAddr = PeerAddr(internal);
csinfo.externalAddr = PeerAddr(external);
copysetseverl.AddCopysetPeerInfo(csinfo);
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class MdsClientImpl : public MdsClient {
private:
FSStatusCode ReturnError(int retcode);

template<typename T>
void GetEndPoint(const T &info, butil::EndPoint *internal,
butil::EndPoint *external);

private:
MDSBaseClient *mdsbasecli_;
::curve::client::RPCExcutorRetryPolicy rpcexcutor_;
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/mds/topology/topology_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ void TopologyManager::GetMetaServerListInCopysets(
location->set_hostip(metaserver.GetInternalHostIp());
location->set_port(metaserver.GetInternalPort());
location->set_externalip(metaserver.GetExternalHostIp());
location->set_externalport(metaserver.GetExternalPort());
} else {
LOG(INFO) << "GetMetaserver failed"
<< " when GetMetaServerListInCopysets.";
Expand Down
50 changes: 40 additions & 10 deletions curvefs/src/metaserver/metaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <brpc/server.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <braft/builtin_service_impl.h>

#include "absl/memory/memory.h"
#include "curvefs/src/metaserver/copyset/copyset_service.h"
Expand Down Expand Up @@ -64,6 +65,8 @@ void Metaserver::InitOptions(std::shared_ptr<Configuration> conf) {
conf_ = conf;
conf_->GetValueFatalIfFail("global.ip", &options_.ip);
conf_->GetValueFatalIfFail("global.port", &options_.port);
conf_->GetBoolValue("global.enable_external_server",
&options_.enableExternalServer);

std::string value;
conf_->GetValueFatalIfFail("bthread.worker_count", &value);
Expand All @@ -86,7 +89,10 @@ void Metaserver::InitRegisterOptions() {
&registerOptions_.metaserverInternalIp);
conf_->GetValueFatalIfFail("global.external_ip",
&registerOptions_.metaserverExternalIp);
conf_->GetValueFatalIfFail("global.port", &registerOptions_.metaserverPort);
conf_->GetValueFatalIfFail("global.port",
&registerOptions_.metaserverInternalPort);
conf_->GetValueFatalIfFail("global.external_port",
&registerOptions_.metaserverExternalPort);
conf_->GetValueFatalIfFail("mds.register_retries",
&registerOptions_.registerRetries);
conf_->GetValueFatalIfFail("mds.register_timeoutMs",
Expand Down Expand Up @@ -170,37 +176,59 @@ void Metaserver::Run() {

PartitionCleanManager::GetInstance().Run();

brpc::Server server;
butil::ip_t ip;
LOG_IF(FATAL, 0 != butil::str2ip(options_.ip.c_str(), &ip))
<< "convert " << options_.ip << " to ip failed";
butil::EndPoint listenAddr(ip, options_.port);

// add internal server
server_ = absl::make_unique<brpc::Server>();
metaService_ = absl::make_unique<MetaServerServiceImpl>(
copysetNodeManager_, inflightThrottle_.get());
copysetService_ =
absl::make_unique<CopysetServiceImpl>(copysetNodeManager_);
raftCliService2_ = absl::make_unique<RaftCliService2>(copysetNodeManager_);

// add metaserver service
LOG_IF(FATAL, server_->AddService(metaService_.get(),
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add metaserverService error";

LOG_IF(FATAL, server_->AddService(copysetService_.get(),
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add copysetservice error";

butil::ip_t ip;
LOG_IF(FATAL, 0 != butil::str2ip(options_.ip.c_str(), &ip))
<< "convert " << options_.ip << " to ip failed";
butil::EndPoint listenAddr(ip, options_.port);

// add raft-related service
copysetNodeManager_->AddService(server_.get(), listenAddr);

// start rpc server
// start internal rpc server
brpc::ServerOptions option;
if (options_.bthreadWorkerCount != -1) {
option.num_threads = options_.bthreadWorkerCount;
}
LOG_IF(FATAL, server_->Start(listenAddr, &option) != 0)
<< "start brpc server error";
<< "start internal brpc server error";

// add external server
if (options_.enableExternalServer) {
externalServer_ = absl::make_unique<brpc::Server>();
LOG_IF(FATAL, externalServer_->AddService(metaService_.get(),
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add metaserverService error";
LOG_IF(FATAL, externalServer_->AddService(copysetService_.get(),
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add copysetService error";
LOG_IF(FATAL, externalServer_->AddService(raftCliService2_.get(),
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add raftCliService2 error";
braft::RaftStatImpl raftStatService;
LOG_IF(FATAL, externalServer_->AddService(&raftStatService,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0)
<< "add raftStatService error";

// start external rpc server
LOG_IF(FATAL, externalServer_->Start(listenAddr, &option) != 0)
<< "start external brpc server error";
}

// try start s3compact wq
LOG_IF(FATAL, S3CompactManager::GetInstance().Run() != 0);
Expand All @@ -222,6 +250,8 @@ void Metaserver::Stop() {

LOG(INFO) << "MetaServer is going to quit";

externalServer_->Stop(0);
externalServer_->Join();
server_->Stop(0);
server_->Join();

Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/metaserver/metaserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "curvefs/src/metaserver/copyset/apply_queue.h"
#include "curvefs/src/metaserver/copyset/config.h"
#include "curvefs/src/metaserver/copyset/copyset_node_manager.h"
#include "curvefs/src/metaserver/copyset/raft_cli_service2.h"
#include "curvefs/src/metaserver/copyset/copyset_service.h"
#include "curvefs/src/metaserver/register.h"
#include "curvefs/src/metaserver/heartbeat.h"
Expand All @@ -48,11 +49,13 @@ using ::curvefs::metaserver::copyset::ApplyQueue;
using ::curvefs::metaserver::copyset::CopysetNodeManager;
using ::curvefs::metaserver::copyset::CopysetNodeOptions;
using ::curvefs::metaserver::copyset::CopysetServiceImpl;
using ::curvefs::metaserver::copyset::RaftCliService2;

struct MetaserverOptions {
std::string ip;
int port;
int bthreadWorkerCount = -1;
bool enableExternalServer;
};

class Metaserver {
Expand Down Expand Up @@ -87,8 +90,11 @@ class Metaserver {
MetaServerMetadata metadate_;

std::unique_ptr<brpc::Server> server_;
std::unique_ptr<brpc::Server> externalServer_;

std::unique_ptr<MetaServerServiceImpl> metaService_;
std::unique_ptr<CopysetServiceImpl> copysetService_;
std::unique_ptr<RaftCliService2> raftCliService2_;

HeartbeatOptions heartbeatOptions_;
Heartbeat heartbeat_;
Expand Down
29 changes: 16 additions & 13 deletions curvefs/src/metaserver/register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ int Register::RegisterToMDS(MetaServerMetadata *metadata) {

req.set_hostname(hostname);
req.set_hostip(ops_.metaserverInternalIp);
req.set_port(ops_.metaserverPort);
req.set_port(ops_.metaserverInternalPort);
req.set_externalip(ops_.metaserverExternalIp);
req.set_externalport(ops_.metaserverPort);
req.set_externalport(ops_.metaserverExternalPort);

LOG(INFO) << " Registering to MDS " << mdsEps_[inServiceIndex_]
<< ". hostname: " << hostname
<< ", internal ip: " << ops_.metaserverInternalIp
<< ", port: " << ops_.metaserverPort
<< ", external ip: " << ops_.metaserverExternalIp;
<< ", port: " << ops_.metaserverInternalPort
<< ", external ip: " << ops_.metaserverExternalIp
<< ", external port: " << ops_.metaserverExternalPort;

int retries = ops_.registerRetries;
while (retries >= 0) {
Expand All @@ -87,7 +88,7 @@ int Register::RegisterToMDS(MetaServerMetadata *metadata) {

if (channel.Init(mdsEps_[inServiceIndex_].c_str(), NULL) != 0) {
LOG(ERROR) << ops_.metaserverInternalIp << ":"
<< ops_.metaserverPort
<< ops_.metaserverInternalPort
<< " Fail to init channel to MDS "
<< mdsEps_[inServiceIndex_];
return -1;
Expand All @@ -99,12 +100,13 @@ int Register::RegisterToMDS(MetaServerMetadata *metadata) {
break;
} else {
LOG(INFO) << ops_.metaserverInternalIp << ":"
<< ops_.metaserverPort << " Fail to register to MDS "
<< mdsEps_[inServiceIndex_]
<< ", cntl errorCode: " << cntl.ErrorCode() << ","
<< " cntl error: " << cntl.ErrorText() << ","
<< " statusCode: " << resp.statuscode() << ","
<< " going to sleep and try again.";
<< ops_.metaserverInternalPort
<< " Fail to register to MDS "
<< mdsEps_[inServiceIndex_]
<< ", cntl errorCode: " << cntl.ErrorCode() << ","
<< " cntl error: " << cntl.ErrorText() << ","
<< " statusCode: " << resp.statuscode() << ","
<< " going to sleep and try again.";
if (cntl.ErrorCode() == EHOSTDOWN ||
cntl.ErrorCode() == brpc::ELOGOFF) {
inServiceIndex_ = (inServiceIndex_ + 1) % mdsEps_.size();
Expand All @@ -115,7 +117,8 @@ int Register::RegisterToMDS(MetaServerMetadata *metadata) {
}

if (retries <= 0) {
LOG(ERROR) << ops_.metaserverInternalIp << ":" << ops_.metaserverPort
LOG(ERROR) << ops_.metaserverInternalIp << ":"
<< ops_.metaserverInternalPort
<< " Fail to register to MDS for " << ops_.registerRetries
<< " times.";
return -1;
Expand All @@ -125,7 +128,7 @@ int Register::RegisterToMDS(MetaServerMetadata *metadata) {
metadata->set_id(resp.metaserverid());
metadata->set_token(resp.token());

LOG(INFO) << ops_.metaserverInternalIp << ":" << ops_.metaserverPort
LOG(INFO) << ops_.metaserverInternalIp << ":" << ops_.metaserverInternalPort
<< " Successfully registered to MDS: " << mdsEps_[inServiceIndex_]
<< ", metaserver id: " << metadata->id() << ","
<< " token: " << metadata->token();
Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/metaserver/register.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ struct RegisterOptions {
std::string mdsListenAddr;
std::string metaserverInternalIp;
std::string metaserverExternalIp;
int metaserverPort;
uint32_t metaserverInternalPort;
uint32_t metaserverExternalPort;
int registerRetries;
int registerTimeout;
};
Expand Down

0 comments on commit 9e79c77

Please sign in to comment.