Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule pendding online chunkserver #252

Merged
merged 1 commit into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 114 additions & 34 deletions src/mds/schedule/copySetScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,97 @@ namespace curve {
namespace mds {
namespace schedule {
int CopySetScheduler::Schedule() {
LOG(INFO) << "copysetScheduler begin";
LOG(INFO) << "schedule: copysetScheduler begin";

int res = 0;
int oneRoundGenOp = 0;
for (auto lid : topo_->GetLogicalpools()) {
res = DoCopySetSchedule(lid);
oneRoundGenOp += DoCopySetSchedule(lid);
}
return res;

LOG(INFO) << "schedule: copysetScheduler end, generate operator num "
<< oneRoundGenOp;
return oneRoundGenOp;
}

int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
// 1. collect the chunkserver list and copyset list of the cluster, then
// collect copyset on every online chunkserver
auto copysetList = topo_->GetCopySetInfosInLogicalPool(lid);
auto chunkserverList = topo_->GetChunkServersInLogicalPool(lid);
std::map<ChunkServerIdType, std::vector<CopySetInfo>> distribute;
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
copysetList, chunkserverList, &distribute);
if (distribute.empty()) {
LOG(WARNING) << "no not-retired chunkserver in topology";
return UNINTIALIZE_ID;
int CopySetScheduler::PenddingCopySetSchedule(const std::map<ChunkServerIdType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more reasonable to schedule chunkserver in pendding status by recoverSchedule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecorveSchedule handle the copyset which aready lost at least one copy. CopysetScheduler handle the copyset which has all copys.

std::vector<CopySetInfo>> &distribute) {
int oneRoundGenOp = 0;
// for every chunkserver, find one copyset to migrate out
for (auto it = distribute.begin(); it != distribute.end(); it++) {
ChunkServerIdType source = it->first;
int copysetNum = it->second.size();
if (copysetNum == 0) {
continue;
}

// find one copyset to migrate out from source chunkserver
for (auto info : it->second) {
// does not meet the basic conditions
if (!CopySetSatisfiyBasicMigrationCond(info)) {
continue;
}

auto target = SelectBestPlacementChunkServer(info, source);
if (target == UNINTIALIZE_ID) {
LOG(WARNING) << "copysetScheduler can not select chunkServer "
"to migrate " << info.CopySetInfoStr()
<< ", which replica: " << source << " is pendding";
continue;
}

Operator op = operatorFactory.CreateChangePeerOperator(
info, source, target, OperatorPriority::HighPriority);
op.timeLimit = std::chrono::seconds(changeTimeSec_);

if (AddOperatorAndCreateCopyset(op, info, target)) {
oneRoundGenOp++;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I know if the migration is complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function int StatusTool::ChunkServerListCmd() in status_tool.cpp is modified. It can print the pendding status of chunkserver.

}

if (oneRoundGenOp != 0) {
LOG(INFO) << "pendding copyset scheduler migrate " << oneRoundGenOp
<< " copyset at this round";
}

return oneRoundGenOp;
}

bool CopySetScheduler::AddOperatorAndCreateCopyset(const Operator &op,
const CopySetInfo &choose,
const ChunkServerIdType &target) {
// add operator
if (!opController_->AddOperator(op)) {
LOG(INFO) << "copysetSchduler add op " << op.OpToString()
<< " fail, copyset has already has operator"
<< " or operator num exceeds the limit.";
return false;
}

// create copyset
if (!topo_->CreateCopySetAtChunkServer(choose.id, target)) {
LOG(ERROR) << "copysetScheduler create " << choose.CopySetInfoStr()
<< " on chunkServer: " << target
<< " error, delete operator" << op.OpToString();
opController_->RemoveOperator(choose.id);
return false;
}

LOG(INFO) << "copysetScheduler create " << choose.CopySetInfoStr()
<< "on chunkserver:" << target
<< " success. generator op: "
<< op.OpToString() << "success";
return true;
}

int CopySetScheduler::NormalCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute) {
// 2. measure the average, range and standard deviation of number of copyset
// on chunkservers
float avg;
int range;
float stdvariance;
int oneRoundGenOp = 0;
StatsCopysetDistribute(distribute, &avg, &range, &stdvariance);
/**
* 3. Set migration condition
Expand All @@ -83,37 +147,53 @@ int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
**/
ChunkServerIdType source = UNINTIALIZE_ID;
if (range <= avg * copysetNumRangePercent_) {
return source;
return oneRoundGenOp;
}

Operator op;
ChunkServerIdType target = UNINTIALIZE_ID;
CopySetInfo choose;
// this function call will select the source, target and the copyset
if (CopySetMigration(distribute, &op, &source, &target, &choose)) {
// add operator
if (!opController_->AddOperator(op)) {
LOG(INFO) << "copysetSchduler add op " << op.OpToString()
<< " fail, copyset has already has operator";
if (AddOperatorAndCreateCopyset(op, choose, target)) {
oneRoundGenOp++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code coincides with the above

if (CopySetMigration(distribute, &op, &source, &target, &choose)) {
// repeat
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most Most of the processing logic is different

}
}

return oneRoundGenOp;
}

// create copyset
if (!topo_->CreateCopySetAtChunkServer(choose.id, target)) {
LOG(ERROR) << "copysetScheduler create " << choose.CopySetInfoStr()
<< " on chunkServer: " << target
<< " error, delete operator" << op.OpToString();
opController_->RemoveOperator(choose.id);
} else {
LOG(INFO) << "copysetScheduler create " << choose.CopySetInfoStr()
<< "on chunkserver:" << target
<< " success. generator op: "
<< op.OpToString() << "success";
int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
// 1. collect the chunkserver list and copyset list of the cluster, then
// collect copyset on every online chunkserver
auto copysetList = topo_->GetCopySetInfosInLogicalPool(lid);
auto chunkserverList = topo_->GetChunkServersInLogicalPool(lid);

std::map<ChunkServerIdType, std::vector<CopySetInfo>> penddingDistribute;
SchedulerHelper::GetCopySetDistributionInOnlineChunkServer(
copysetList, chunkserverList, &penddingDistribute);
SchedulerHelper::FilterCopySetDistributions(ChunkServerStatus::PENDDING,
chunkserverList, &penddingDistribute);
if (!penddingDistribute.empty()) {
int oneRoundGenOp = PenddingCopySetSchedule(penddingDistribute);
// If generate pendding copy set schedule, return here.
if (oneRoundGenOp != 0) {
return oneRoundGenOp;
}
}

LOG_EVERY_N(INFO, 20) << "copysetScheduler is continually adjusting";
LOG(INFO) << "copysetScheduler end.";
return static_cast<int>(source);
// If no pendding copyset schedule operator generated,
// run NormalCopySetSchedule
std::map<ChunkServerIdType, std::vector<CopySetInfo>> normalDistribute;
SchedulerHelper::GetCopySetDistributionInOnlineChunkServer(
copysetList, chunkserverList, &normalDistribute);
SchedulerHelper::FilterCopySetDistributions(ChunkServerStatus::READWRITE,
chunkserverList, &normalDistribute);
if (normalDistribute.empty()) {
LOG(WARNING) << "no not-retired chunkserver in topology";
return 0;
}
return NormalCopySetSchedule(normalDistribute);
}

void CopySetScheduler::StatsCopysetDistribute(
Expand Down
20 changes: 15 additions & 5 deletions src/mds/schedule/leaderScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ namespace curve {
namespace mds {
namespace schedule {
int LeaderScheduler::Schedule() {
LOG(INFO) << "leaderScheduler begin.";

LOG(INFO) << "schedule: leaderScheduler begin.";
int oneRoundGenOp = 0;
for (auto lid : topo_->GetLogicalpools()) {
DoLeaderSchedule(lid);
oneRoundGenOp += DoLeaderSchedule(lid);
}
return 1;

LOG(INFO) << "schedule: leaderScheduler end, generate operator num "
<< oneRoundGenOp;
return oneRoundGenOp;
}

int LeaderScheduler::DoLeaderSchedule(PoolIdType lid) {
Expand All @@ -57,7 +60,8 @@ int LeaderScheduler::DoLeaderSchedule(PoolIdType lid) {
std::shuffle(csInfos.begin(), csInfos.end(), g);

for (auto csInfo : csInfos) {
if (csInfo.IsOffline()) {
// skip offline chunkserver or pendding chunkserver
if (csInfo.IsOffline() || csInfo.IsPendding()) {
continue;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (csInfo.IsOffline() || csInfo.IsPendding()) {
continue;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Expand Down Expand Up @@ -177,6 +181,12 @@ bool LeaderScheduler::transferLeaderOut(ChunkServerIdType source, int count,
break;
}

// can not transfer to pendding chunkserver
if (csInfo.IsPendding()) {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (csInfo.IsOffline() || csInfo.IsPendding()) {
continue;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case between offline and pendding is different. If the chunk is offline, it break this while. If the chunk is pendding, it skip this round with continue.


// can not transfer to myself
if (source == peerInfo.id) {
continue;
}
Expand Down
5 changes: 3 additions & 2 deletions src/mds/schedule/rapidLeaderScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ bool RapidLeaderScheduler::LeaderStatInSpecifiedLogicalPool(
}

// get copyset info list for every chunkserver
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
SchedulerHelper::GetCopySetDistributionInOnlineChunkServer(
copysetVec, chunkserverVec, &stat->distribute);

SchedulerHelper::FilterCopySetDistributions(ChunkServerStatus::READWRITE,
chunkserverVec, &stat->distribute);
// calculate average leader number for every chunkserver
stat->avgLeaderNum = copysetVec.size() / chunkserverVec.size();

Expand Down
16 changes: 12 additions & 4 deletions src/mds/schedule/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
// the one to remove should be selected from BCDE
// 2.2 greater
// any chunkserver to remove will satisfy the requirement, thus we first
// consider the one not online, then consider the scatter-width
// consider the one not online, then the one is pendding ,
// then consider the scatter-width
// for example, if the replicas are:
// A(zone1) B(zone2) C(zone3) D(zone4) E(zone4)
// we can remove anyone of it
Expand All @@ -273,7 +274,7 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
}
}

// consider offline chunkservers first
// consider offline or pendding chunkservers first
for (auto cs : candidateChunkServer) {
ChunkServerInfo csInfo;
if (!topo_->GetChunkServerInfo(cs, &csInfo)) {
Expand All @@ -283,15 +284,22 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
return UNINTIALIZE_ID;
}

// chunkserver is not offline
// chunkserver is offline
if (csInfo.IsOffline()) {
LOG(WARNING) << "scheduler choose to remove offline chunkServer "
<< cs << " from " << copySetInfo.CopySetInfoStr();
return cs;
}

// chunkserver is pendding
if (csInfo.IsPendding()) {
LOG(WARNING) << "scheduler choose to remove pendding chunkServer "
<< cs << " from " << copySetInfo.CopySetInfoStr();
return cs;
}
}

// if all chunkserver are online, select according to the impact on scatter-width //NOLINT
// if all chunkserver are online and not pendding, select according to the impact on scatter-width //NOLINT
// first rank chunkser by the number of copyset on it
std::map<ChunkServerIdType, std::vector<CopySetInfo>> distribute;
std::vector<std::pair<ChunkServerIdType, std::vector<CopySetInfo>>> desc;
Expand Down
32 changes: 31 additions & 1 deletion src/mds/schedule/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class CopySetScheduler : public Scheduler {
* @brief Schedule Generating operator according to
* the condition of the cluster
*
* @return chunkserver to add, a value for POC
* @return operator num generated
*/
int Schedule() override;

Expand Down Expand Up @@ -245,6 +245,36 @@ class CopySetScheduler : public Scheduler {
*/
bool CopySetSatisfiyBasicMigrationCond(const CopySetInfo &info);

/**
* @brief migrate one copyset between online & no pendding chunkservers
*
* @param[in] distribute Copyset on every online && no pendding chunkserver
*
* @return Source node of the migration
*/
int NormalCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute);

/**
* @brief migrate one copyset from online && pendding chunkserver to
* other healty chunkserver
* @param[in] distribute Copyset on every online && pendding chunkserver
* @return migrate copyset num
*/
int PenddingCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute);

/**
* @brief add op to opController and create copyset at chunkserver
* @param[in] op the oprator to add to opController
* @param[in] choose the copyset need to schedule
* @param[in] target the chunkserver want to add to this copyset
* @return return true if sucess, retrun false if not
*/
bool AddOperatorAndCreateCopyset(const Operator &op,
const CopySetInfo &choose,
const ChunkServerIdType &target);

private:
// Running interval of CopySetScheduler
int64_t runInterval_;
Expand Down
17 changes: 16 additions & 1 deletion src/mds/schedule/scheduler_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ bool SchedulerHelper::InvovledReplicasSatisfyScatterWidthAfterMigration(
return allSatisfy;
}

void SchedulerHelper::CopySetDistributionInOnlineChunkServer(
void SchedulerHelper::GetCopySetDistributionInOnlineChunkServer(
const std::vector<CopySetInfo> &copysetList,
const std::vector<ChunkServerInfo> &chunkserverList,
std::map<ChunkServerIdType, std::vector<CopySetInfo>> *out) {
Expand All @@ -370,15 +370,30 @@ void SchedulerHelper::CopySetDistributionInOnlineChunkServer(

// remove offline chunkserver, and report empty list for empty chunkserver
for (auto item : chunkserverList) {
// remove offline chunkserver
if (item.IsOffline()) {
out->erase(item.info.id);
continue;
}

// report empty list for chunkserver with no copyset
if (out->find(item.info.id) == out->end()) {
(*out)[item.info.id] = std::vector<CopySetInfo>{};
}
}
}

void SchedulerHelper::FilterCopySetDistributions(const ChunkServerStatus status,
const std::vector<ChunkServerInfo> &chunkserverList,
std::map<ChunkServerIdType, std::vector<CopySetInfo>> *distributions) {
for (auto item : chunkserverList) {
if (item.status != status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find the chunkserver with the same status, this judgment condition is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

distributions->erase(item.info.id);
}
}
return;
}

} // namespace schedule
} // namespace mds
} // namespace curve
Expand Down
Loading