Skip to content

Commit

Permalink
schedule pendding online chunkserver
Browse files Browse the repository at this point in the history
  • Loading branch information
cw123 committed Mar 12, 2021
1 parent 650674a commit 082fd16
Show file tree
Hide file tree
Showing 12 changed files with 589 additions and 55 deletions.
142 changes: 107 additions & 35 deletions src/mds/schedule/copySetScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,97 @@ namespace curve {
namespace mds {
namespace schedule {
int CopySetScheduler::Schedule() {
LOG(INFO) << "copysetScheduler begin";
LOG(INFO) << "schedule: copysetScheduler begin";

int res = 0;
int oneRoundGenOp = 0;
for (auto lid : topo_->GetLogicalpools()) {
res = DoCopySetSchedule(lid);
oneRoundGenOp += DoCopySetSchedule(lid);
}
return res;

LOG(INFO) << "schedule: copysetScheduler end, generate operator num "
<< oneRoundGenOp;
return oneRoundGenOp;
}

int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
// 1. collect the chunkserver list and copyset list of the cluster, then
// collect copyset on every online chunkserver
auto copysetList = topo_->GetCopySetInfosInLogicalPool(lid);
auto chunkserverList = topo_->GetChunkServersInLogicalPool(lid);
std::map<ChunkServerIdType, std::vector<CopySetInfo>> distribute;
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
copysetList, chunkserverList, &distribute);
if (distribute.empty()) {
LOG(WARNING) << "no not-retired chunkserver in topology";
return UNINTIALIZE_ID;
int CopySetScheduler::PenddingCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute) {
int oneRoundGenOp = 0;
// for every chunkserver, find one copy set to migrate out
for (auto it = distribute.begin(); it != distribute.end(); it++) {
ChunkServerIdType source = it->first;
int copysetNum = it->second.size();
if (copysetNum == 0) {
continue;
}

// find one copy set to migrate out from source chunkserver
for (auto info : it->second) {
// does not meet the basic conditions
if (!CopySetSatisfiyBasicMigrationCond(info)) {
continue;
}

auto target = SelectBestPlacementChunkServer(info, source);
if (target == UNINTIALIZE_ID) {
LOG(WARNING) << "copysetScheduler can not select chunkServer "
"to migrate " << info.CopySetInfoStr()
<< ", which replica: " << source << " is pendding";
continue;
}

Operator op = operatorFactory.CreateChangePeerOperator(
info, source, target, OperatorPriority::HighPriority);
op.timeLimit = std::chrono::seconds(changeTimeSec_);

if (AddOperatorAndCreateCopyset(op, info, target)) {
oneRoundGenOp++;
break;
}
}
}

if (oneRoundGenOp != 0) {
LOG(INFO) << "pendding copyset scheduler migrate " << oneRoundGenOp
<< " copyset at this round";
}

return oneRoundGenOp;
}

bool CopySetScheduler::AddOperatorAndCreateCopyset(const Operator &op,
const CopySetInfo &choose,
const ChunkServerIdType &target) {
// add operator
if (!opController_->AddOperator(op)) {
LOG(INFO) << "copysetSchduler add op " << op.OpToString()
<< " fail, copyset has already has operator";
return false;
}

// create copyset
if (!topo_->CreateCopySetAtChunkServer(choose.id, target)) {
LOG(ERROR) << "copysetScheduler create " << choose.CopySetInfoStr()
<< " on chunkServer: " << target
<< " error, delete operator" << op.OpToString();
opController_->RemoveOperator(choose.id);
return false;
}

LOG(INFO) << "copysetScheduler create " << choose.CopySetInfoStr()
<< "on chunkserver:" << target
<< " success. generator op: "
<< op.OpToString() << "success";
return true;
}

int CopySetScheduler::NormalCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute) {
// 2. measure the average, range and standard deviation of number of copyset
// on chunkservers
float avg;
int range;
float stdvariance;
int oneRoundGenOp = 0;
StatsCopysetDistribute(distribute, &avg, &range, &stdvariance);
/**
* 3. Set migration condition
Expand All @@ -83,37 +147,45 @@ int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
**/
ChunkServerIdType source = UNINTIALIZE_ID;
if (range <= avg * copysetNumRangePercent_) {
return source;
return oneRoundGenOp;
}

Operator op;
ChunkServerIdType target = UNINTIALIZE_ID;
CopySetInfo choose;
// this function call will select the source, target and the copyset
if (CopySetMigration(distribute, &op, &source, &target, &choose)) {
// add operator
if (!opController_->AddOperator(op)) {
LOG(INFO) << "copysetSchduler add op " << op.OpToString()
<< " fail, copyset has already has operator";
if (AddOperatorAndCreateCopyset(op, choose, target)) {
oneRoundGenOp++;
}
}

// create copyset
if (!topo_->CreateCopySetAtChunkServer(choose.id, target)) {
LOG(ERROR) << "copysetScheduler create " << choose.CopySetInfoStr()
<< " on chunkServer: " << target
<< " error, delete operator" << op.OpToString();
opController_->RemoveOperator(choose.id);
} else {
LOG(INFO) << "copysetScheduler create " << choose.CopySetInfoStr()
<< "on chunkserver:" << target
<< " success. generator op: "
<< op.OpToString() << "success";
}
return oneRoundGenOp;
}

int CopySetScheduler::DoCopySetSchedule(PoolIdType lid) {
// 1. collect the chunkserver list and copyset list of the cluster, then
// collect copyset on every online chunkserver
auto copysetList = topo_->GetCopySetInfosInLogicalPool(lid);
auto chunkserverList = topo_->GetChunkServersInLogicalPool(lid);

std::map<ChunkServerIdType, std::vector<CopySetInfo>> penddingDistribute;
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
ChunkServerStatus::PENDDING,
copysetList, chunkserverList, &penddingDistribute);
if (!penddingDistribute.empty()) {
return PenddingCopySetSchedule(penddingDistribute);
}

LOG_EVERY_N(INFO, 20) << "copysetScheduler is continually adjusting";
LOG(INFO) << "copysetScheduler end.";
return static_cast<int>(source);
std::map<ChunkServerIdType, std::vector<CopySetInfo>> normalDistribute;
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
ChunkServerStatus::READWRITE, copysetList,
chunkserverList, &normalDistribute);
if (normalDistribute.empty()) {
LOG(WARNING) << "no not-retired chunkserver in topology";
return 0;
}
return NormalCopySetSchedule(normalDistribute);
}

void CopySetScheduler::StatsCopysetDistribute(
Expand Down
20 changes: 15 additions & 5 deletions src/mds/schedule/leaderScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ namespace curve {
namespace mds {
namespace schedule {
int LeaderScheduler::Schedule() {
LOG(INFO) << "leaderScheduler begin.";

LOG(INFO) << "schedule: leaderScheduler begin.";
int oneRoundGenOp = 0;
for (auto lid : topo_->GetLogicalpools()) {
DoLeaderSchedule(lid);
oneRoundGenOp += DoLeaderSchedule(lid);
}
return 1;

LOG(INFO) << "schedule: leaderScheduler end, generate operator num "
<< oneRoundGenOp;
return oneRoundGenOp;
}

int LeaderScheduler::DoLeaderSchedule(PoolIdType lid) {
Expand All @@ -57,7 +60,8 @@ int LeaderScheduler::DoLeaderSchedule(PoolIdType lid) {
std::shuffle(csInfos.begin(), csInfos.end(), g);

for (auto csInfo : csInfos) {
if (csInfo.IsOffline()) {
// skip offline chunkserver or pendding chunkserver
if (csInfo.IsOffline() || csInfo.IsPendding()) {
continue;
}

Expand Down Expand Up @@ -177,6 +181,12 @@ bool LeaderScheduler::transferLeaderOut(ChunkServerIdType source, int count,
break;
}

// can not transfer to pendding chunkserver
if (csInfo.IsPendding()) {
continue;
}

// can not transfer to myself
if (source == peerInfo.id) {
continue;
}
Expand Down
1 change: 1 addition & 0 deletions src/mds/schedule/rapidLeaderScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ bool RapidLeaderScheduler::LeaderStatInSpecifiedLogicalPool(

// get copyset info list for every chunkserver
SchedulerHelper::CopySetDistributionInOnlineChunkServer(
ChunkServerStatus::READWRITE,
copysetVec, chunkserverVec, &stat->distribute);

// calculate average leader number for every chunkserver
Expand Down
16 changes: 12 additions & 4 deletions src/mds/schedule/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
// the one to remove should be selected from BCDE
// 2.2 greater
// any chunkserver to remove will satisfy the requirement, thus we first
// consider the one not online, then consider the scatter-width
// consider the one not online, then the one is pendding ,
// then consider the scatter-width
// for example, if the replicas are:
// A(zone1) B(zone2) C(zone3) D(zone4) E(zone4)
// we can remove anyone of it
Expand All @@ -273,7 +274,7 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
}
}

// consider offline chunkservers first
// consider offline or pendding chunkservers first
for (auto cs : candidateChunkServer) {
ChunkServerInfo csInfo;
if (!topo_->GetChunkServerInfo(cs, &csInfo)) {
Expand All @@ -283,15 +284,22 @@ ChunkServerIdType Scheduler::SelectRedundantReplicaToRemove(
return UNINTIALIZE_ID;
}

// chunkserver is not offline
// chunkserver is offline
if (csInfo.IsOffline()) {
LOG(WARNING) << "scheduler choose to remove offline chunkServer "
<< cs << " from " << copySetInfo.CopySetInfoStr();
return cs;
}

// chunkserver is pendding
if (csInfo.IsPendding()) {
LOG(WARNING) << "scheduler choose to remove pendding chunkServer "
<< cs << " from " << copySetInfo.CopySetInfoStr();
return cs;
}
}

// if all chunkserver are online, select according to the impact on scatter-width //NOLINT
// if all chunkserver are online and not pendding, select according to the impact on scatter-width //NOLINT
// first rank chunkser by the number of copyset on it
std::map<ChunkServerIdType, std::vector<CopySetInfo>> distribute;
std::vector<std::pair<ChunkServerIdType, std::vector<CopySetInfo>>> desc;
Expand Down
32 changes: 31 additions & 1 deletion src/mds/schedule/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class CopySetScheduler : public Scheduler {
* @brief Schedule Generating operator according to
* the condition of the cluster
*
* @return chunkserver to add, a value for POC
* @return operator num generated
*/
int Schedule() override;

Expand Down Expand Up @@ -245,6 +245,36 @@ class CopySetScheduler : public Scheduler {
*/
bool CopySetSatisfiyBasicMigrationCond(const CopySetInfo &info);

/**
* @brief migrate one copyset between online & no pendding chunkservers
*
* @param[in] distribute Copyset on every online && no pendding chunkserver
*
* @return Source node of the migration
*/
int NormalCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute);

/**
* @brief migrate one copyset from online && pendding chunkserver to
* other healty chunkserver
* @param[in] distribute Copyset on every online && pendding chunkserver
* @return migrate copyset num
*/
int PenddingCopySetSchedule(const std::map<ChunkServerIdType,
std::vector<CopySetInfo>> &distribute);

/**
* @brief add op to opController and create copyset at chunkserver
* @param[in] op the oprator to add to opController
* @param[in] choose the copyset need to schedule
* @param[in] target the chunkserver want to add to this copyset
* @return return true if sucess, retrun false if not
*/
bool AddOperatorAndCreateCopyset(const Operator &op,
const CopySetInfo &choose,
const ChunkServerIdType &target);

private:
// Running interval of CopySetScheduler
int64_t runInterval_;
Expand Down
37 changes: 37 additions & 0 deletions src/mds/schedule/scheduler_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,43 @@ void SchedulerHelper::CopySetDistributionInOnlineChunkServer(
}
}
}

void SchedulerHelper::CopySetDistributionInOnlineChunkServer(
const ChunkServerStatus status,
const std::vector<CopySetInfo> &copysetList,
const std::vector<ChunkServerInfo> &chunkserverList,
std::map<ChunkServerIdType, std::vector<CopySetInfo>> *out) {
// calculate the copysetlist by traversing the copyset list
for (auto item : copysetList) {
for (auto peer : item.peers) {
if (out->find(peer.id) == out->end()) {
(*out)[peer.id] = std::vector<CopySetInfo>{item};
} else {
(*out)[peer.id].emplace_back(item);
}
}
}

for (auto item : chunkserverList) {
// remove offline chunkserver
if (item.IsOffline()) {
out->erase(item.info.id);
continue;
}

if (item.status != status) {
out->erase(item.info.id);
continue;
}

// report empty list for empty and healthy chunkserver
if (item.status == ChunkServerStatus::READWRITE
&& out->find(item.info.id) == out->end()) {
(*out)[item.info.id] = std::vector<CopySetInfo>{};
}
}
}

} // namespace schedule
} // namespace mds
} // namespace curve
Expand Down
15 changes: 15 additions & 0 deletions src/mds/schedule/scheduler_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ class SchedulerHelper {
const std::vector<CopySetInfo> &copysetList,
const std::vector<ChunkServerInfo> &chunkserverList,
std::map<ChunkServerIdType, std::vector<CopySetInfo>> *out);

/**
* @brief CopySetDistribution Calculate the copyset number on
* chunkserver in online status
*
* @param[in] status filter the chunkserver in status
* @param[in] copysetList Copyset list in topo info
* @param[in] chunkserverList Chunkserver list in topo info
* @param[out] out List of copyset on chunkserver
*/
static void CopySetDistributionInOnlineChunkServer(
const ChunkServerStatus status,
const std::vector<CopySetInfo> &copysetList,
const std::vector<ChunkServerInfo> &chunkserverList,
std::map<ChunkServerIdType, std::vector<CopySetInfo>> *out);
};
} // namespace schedule
} // namespace mds
Expand Down
Loading

0 comments on commit 082fd16

Please sign in to comment.