Skip to content

Commit

Permalink
client: retry allocate segment until success
Browse files Browse the repository at this point in the history
  • Loading branch information
Wine93 authored and ilixiaocui committed May 10, 2021
1 parent aa0e97b commit f455caf
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 13 deletions.
9 changes: 9 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ mds.refreshTimesPerLease=4
# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS=100000

# The normal retry times for trigger wait strategy
mds.normalRetryTimesBeforeTriggerWait=3

# Max retry time for IO-Path request
mds.maxRetryMsInIOPath=86400000

# Sleep interval for wait
mds.waitSleepMs=10000

#
################# metacache配置信息 ################
#
Expand Down
9 changes: 9 additions & 0 deletions conf/cs_client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ mds.refreshTimesPerLease=4
# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS=100000

# The normal retry times for trigger wait strategy
mds.normalRetryTimesBeforeTriggerWait=3

# Max retry time for IO-Path request
mds.maxRetryMsInIOPath=86400000

# Sleep interval for wait
mds.waitSleepMs=10000

#
################# metacache配置信息 ################
#
Expand Down
9 changes: 9 additions & 0 deletions conf/py_client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ mds.refreshTimesPerLease=4
# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS=100000

# The normal retry times for trigger wait strategy
mds.normalRetryTimesBeforeTriggerWait=3

# Max retry time for IO-Path request
mds.maxRetryMsInIOPath=86400000

# Sleep interval for wait
mds.waitSleepMs=10000

#
################# metacache配置信息 ################
#
Expand Down
9 changes: 9 additions & 0 deletions conf/snap_client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ mds.refreshTimesPerLease=4
# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS=100000

# The normal retry times for trigger wait strategy
mds.normalRetryTimesBeforeTriggerWait=3

# Max retry time for IO-Path request
mds.maxRetryMsInIOPath=86400000

# Sleep interval for wait
mds.waitSleepMs=10000

#
################# metacache配置信息 ################
#
Expand Down
3 changes: 3 additions & 0 deletions curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ client_mds_rpc_retry_interval_us: 100000
client_metacache_get_leader_timeout_ms: 500
client_metacache_get_leader_retry: 5
client_metacache_rpc_retry_interval_us: 100000
client_mds_normal_retry_times_before_trigger_wait: 3
client_mds_max_retry_ms_in_io_path: 86400000
client_mds_wait_sleep_ms: 10000
client_schedule_queue_capacity: 1000000
client_schedule_threadpool_size: 2
client_isolation_task_queue_capacity: 1000000
Expand Down
9 changes: 9 additions & 0 deletions curve-ansible/roles/generate_config/templates/client.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ mds.refreshTimesPerLease={{ client_mds_refresh_times_per_lease }}
# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS={{ client_mds_rpc_retry_interval_us }}

# The normal retry times for trigger wait strategy
mds.normalRetryTimesBeforeTriggerWait={{ client_mds_normal_retry_times_before_trigger_wait }}

# Max retry time for IO-Path request (milliseconds)
mds.maxRetryMsInIOPath={{ client_mds_max_retry_ms_in_io_path }}

# Sleep interval for wait (milliseconds)
mds.waitSleepMs={{ client_mds_wait_sleep_ms }}

#
################# metacache配置信息 ################
#
Expand Down
2 changes: 2 additions & 0 deletions include/client/libcurve.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ enum LIBCURVE_ERROR {
CLIENT_NOT_SUPPORT_SNAPSHOT = 28,
// snapshot功能禁用中
SNAPSTHO_FROZEN = 29,
// You must retry it until success
RETRY_UNTIL_SUCCESS = 30,
// 未知错误
UNKNOWN = 100
};
Expand Down
12 changes: 12 additions & 0 deletions src/client/client_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ int ClientConfig::Init(const char* configpath) {
&fileServiceOption_.metaServerOpt.mdsMaxFailedTimesBeforeChangeMDS);
LOG_IF(ERROR, ret == false) << "config no mds.maxFailedTimesBeforeChangeMDS info"; // NOLINT

ret = conf_.GetUInt64Value("mds.normalRetryTimesBeforeTriggerWait",
&fileServiceOption_.metaServerOpt.mdsNormalRetryTimesBeforeTriggerWait);
LOG_IF(ERROR, ret == false) << "config no mds.normalRetryTimesBeforeTriggerWait info"; // NOLINT

ret = conf_.GetUInt64Value("mds.maxRetryMsInIOPath",
&fileServiceOption_.metaServerOpt.mdsMaxRetryMsInIOPath);
LOG_IF(ERROR, ret == false) << "config no mds.maxRetryMsInIOPath info";

ret = conf_.GetUInt64Value("mds.waitSleepMs",
&fileServiceOption_.metaServerOpt.mdsWaitSleepMs);
LOG_IF(ERROR, ret == false) << "config no mds.waitSleepMs info";

ret = conf_.GetBoolValue("mds.registerToMDS",
&fileServiceOption_.commonOpt.mdsRegisterToMDS);
LOG_IF(ERROR, ret == false) << "config no mds.registerToMDS info";
Expand Down
10 changes: 10 additions & 0 deletions src/client/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ struct MetaServerOption {
uint64_t mdsRPCTimeoutMs = 500;
uint32_t mdsRPCRetryIntervalUS = 50000;
uint32_t mdsMaxFailedTimesBeforeChangeMDS = 5;

/**
* When the failed times except RPC error
* greater than mdsNormalRetryTimesBeforeTriggerWait,
* it will trigger wait strategy, and sleep long time before retry
*/
uint64_t mdsNormalRetryTimesBeforeTriggerWait = 3; // 3 times
uint64_t mdsMaxRetryMsInIOPath = 86400000; // 1 day
uint64_t mdsWaitSleepMs = 10000; // 10 seconds

std::vector<std::string> mdsAddrs;
};

Expand Down
39 changes: 27 additions & 12 deletions src/client/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ LIBCURVE_ERROR MDSClient::MDSRPCExcutor::DoRPCTask(RPCFunc rpctask,
// rpc超时时间
uint64_t rpcTimeOutMS = metaServerOpt_.mdsRPCTimeoutMs;

// The count of normal retry
uint64_t normalRetryCount = 0;

int retcode = -1;
bool needChangeMDS = false;
while (GoOnRetry(startTime, maxRetryTimeMS)) {
Expand All @@ -122,7 +125,7 @@ LIBCURVE_ERROR MDSClient::MDSRPCExcutor::DoRPCTask(RPCFunc rpctask,

// 2. 根据rpc返回值进行预处理
if (retcode < 0) {
curRetryMDSIndex = PreProcessBeforeRetry(retcode,
curRetryMDSIndex = PreProcessBeforeRetry(retcode, &normalRetryCount,
&currentMDSRetryCount, curRetryMDSIndex,
&lastWorkingMDSIndex, &rpcTimeOutMS);
continue;
Expand Down Expand Up @@ -151,26 +154,35 @@ bool MDSClient::MDSRPCExcutor::GoOnRetry(uint64_t startTimeMS,
}

int MDSClient::MDSRPCExcutor::PreProcessBeforeRetry(int status,
uint64_t* normalRetryCount,
uint64_t* curMDSRetryCount,
int curRetryMDSIndex,
int* lastWorkingMDSIndex,
uint64_t* timeOutMS) {
int nextMDSIndex = 0;
bool rpcTimeout = false;
bool needChangeMDS = false;

// It's not a RPC error, but we must retry it until success
if (status == -LIBCURVE_ERROR::RETRY_UNTIL_SUCCESS) {
if (++(*normalRetryCount) >
metaServerOpt_.mdsNormalRetryTimesBeforeTriggerWait) {
bthread_usleep(metaServerOpt_.mdsWaitSleepMs * 1000);
}

// 1. 访问存在的IP地址,但无人监听:ECONNREFUSED
// 2. 正常发送RPC情况下,对端进程挂掉了:EHOSTDOWN
// 3. 对端server调用了Stop:ELOGOFF
// 4. 对端链接已关闭:ECONNRESET
// 5. 在一个mds节点上rpc失败超过限定次数
// 在这几种场景下,主动切换mds。
if (status == -EHOSTDOWN || status == -ECONNRESET ||
} else if (status == -EHOSTDOWN || status == -ECONNRESET ||
status == -ECONNREFUSED || status == -brpc::ELOGOFF ||
*curMDSRetryCount >= metaServerOpt_.mdsMaxFailedTimesBeforeChangeMDS) {
needChangeMDS = true;

// 在开启健康检查的情况下,在底层tcp连接失败时
// rpc请求会本地直接返回 EHOSTSOWN
// rpc请求会本地直接返回 EHOSTDOWN
// 这种情况下,增加一些睡眠时间,避免大量的重试请求占满bthread
// TODO(wuhanqing): 关闭健康检查
if (status == -EHOSTDOWN) {
Expand Down Expand Up @@ -717,8 +729,6 @@ LIBCURVE_ERROR MDSClient::CheckSnapShotStatus(const std::string& filename,
return rpcExcutor.DoRPCTask(task, metaServerOpt_.mdsMaxRetryMS);
}

#define IOPathMaxRetryMS UINT64_MAX

LIBCURVE_ERROR MDSClient::GetServerList(
const LogicPoolID& logicalpooid,
const std::vector<CopysetID>& copysetidvec,
Expand Down Expand Up @@ -780,7 +790,7 @@ LIBCURVE_ERROR MDSClient::GetServerList(
return response.statuscode() == 0 ? LIBCURVE_ERROR::OK :
LIBCURVE_ERROR::FAILED;
};
return rpcExcutor.DoRPCTask(task, IOPathMaxRetryMS);
return rpcExcutor.DoRPCTask(task, metaServerOpt_.mdsMaxRetryMsInIOPath);
}

LIBCURVE_ERROR MDSClient::GetClusterInfo(ClusterContext* clsctx) {
Expand Down Expand Up @@ -913,14 +923,18 @@ LIBCURVE_ERROR MDSClient::GetOrAllocateSegment(bool allocate,

auto statuscode = response.statuscode();
switch (statuscode) {
case StatusCode::kParaError:
LOG(WARNING) << "GetOrAllocateSegment: error param!";
return LIBCURVE_ERROR::FAILED;
case StatusCode::kOwnerAuthFail:
LOG(WARNING) << "GetOrAllocateSegment Auth failed!";
LOG(WARNING) << "GetOrAllocateSegment: auth failed!";
return LIBCURVE_ERROR::AUTHFAIL;
break;
case StatusCode::kFileNotExists:
LOG(WARNING) << "GetOrAllocateSegment: file not exists!";
return LIBCURVE_ERROR::FAILED;
case StatusCode::kSegmentNotAllocated:
LOG(WARNING) << "segment not allocated!";
LOG(WARNING) << "GetOrAllocateSegment: segment not allocated!";
return LIBCURVE_ERROR::NOT_ALLOCATE;
break;
default: break;
}

Expand All @@ -934,7 +948,8 @@ LIBCURVE_ERROR MDSClient::GetOrAllocateSegment(bool allocate,
int chunksNum = pfs.chunks_size();
if (allocate && chunksNum <= 0) {
LOG(WARNING) << "MDS allocate segment, but no chunkinfo!";
return LIBCURVE_ERROR::FAILED;
// Now, we will retry until allocate segment success
return -LIBCURVE_ERROR::RETRY_UNTIL_SUCCESS;
}

for (int i = 0; i < chunksNum; i++) {
Expand All @@ -945,7 +960,7 @@ LIBCURVE_ERROR MDSClient::GetOrAllocateSegment(bool allocate,
}
return LIBCURVE_ERROR::OK;
};
return rpcExcutor.DoRPCTask(task, IOPathMaxRetryMS);
return rpcExcutor.DoRPCTask(task, metaServerOpt_.mdsMaxRetryMsInIOPath);
}

LIBCURVE_ERROR MDSClient::RenameFile(const UserInfo_t& userinfo,
Expand Down
2 changes: 2 additions & 0 deletions src/client/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ class MDSClient {
* 2. 如果上一次rpc返回not connect等返回值,会主动触发切换mds地址重试
* 3. 更新重试信息,比如在当前mds上连续重试的次数
* @param[in]: status为当前rpc的失败返回的状态
* @param normalRetryCount The total count of normal retry
* @param[in][out]: curMDSRetryCount当前mds节点上的重试次数,如果切换mds
* 该值会被重置为1.
* @param[in]: curRetryMDSIndex代表当前正在重试的mds索引
Expand All @@ -452,6 +453,7 @@ class MDSClient {
* @return: 返回下一次重试的mds索引
*/
int PreProcessBeforeRetry(int status,
uint64_t* normalRetryCount,
uint64_t* curMDSRetryCount,
int curRetryMDSIndex,
int* lastWorkingMDSIndex,
Expand Down
17 changes: 16 additions & 1 deletion test/client/client_mdsclient_metacache_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class MDSClientTest : public ::testing::Test {
metaopt.mdsRPCTimeoutMs = 500;
metaopt.mdsRPCRetryIntervalUS = 200;
metaopt.mdsRPCTimeoutMs = 2000;
metaopt.mdsMaxRetryMsInIOPath = 10000;
mdsclient_.Initialize(metaopt);
userinfo.owner = "test";

Expand Down Expand Up @@ -916,15 +917,29 @@ TEST_F(MDSClientTest, GetOrAllocateSegment) {
fi.chunksize = 4 * 1024 * 1024;
fi.segmentsize = 1 * 1024 * 1024 * 1024ul;

std::chrono::system_clock::time_point start, end;
auto startTimer = [&start]() { start = std::chrono::system_clock::now(); };
auto endTimer = [&end]() { end = std::chrono::system_clock::now(); };
auto checkTimer = [&start, &end](uint64_t min, uint64_t max) {
auto elpased = std::chrono::duration_cast<std::chrono::milliseconds>(
end - start).count();
ASSERT_GE(elpased, min);
ASSERT_LE(elpased, max);
};

// TEST CASE: GetOrAllocateSegment failed, wait 10 seconds
curve::mds::GetOrAllocateSegmentResponse resp;
resp.set_statuscode(::curve::mds::StatusCode::kOK);
FakeReturn* fakeres = new FakeReturn(nullptr,
static_cast<void*>(&resp));
curvefsservice.SetGetOrAllocateSegmentFakeReturn(fakeres);

SegmentInfo seg;
startTimer();
ASSERT_EQ(LIBCURVE_ERROR::FAILED,
mdsclient_.GetOrAllocateSegment(true, 0, &fi, &seg));
mdsclient_.GetOrAllocateSegment(true, 0, &fi, &seg));
endTimer();
checkTimer(10000, 11000);

curve::mds::GetOrAllocateSegmentResponse response;
curve::mds::PageFileSegment* pfs = new curve::mds::PageFileSegment;
Expand Down

0 comments on commit f455caf

Please sign in to comment.