Skip to content

Commit

Permalink
[feat]curvefs/client: warmup manager
Browse files Browse the repository at this point in the history
1. add WarmupManager
2. add WarmupManagerS3Impl
3. add query warmup progress in tools-v2

Signed-off-by: Cyber-SiKu <Cyber-SiKu@outlook.com>
  • Loading branch information
Cyber-SiKu committed Feb 15, 2023
1 parent fbc32f4 commit ff4e7b2
Show file tree
Hide file tree
Showing 26 changed files with 2,018 additions and 868 deletions.
2 changes: 2 additions & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ cc_library(
"s3/*.h",
"volume/*.cpp",
"volume/*.h",
"warmup/*.h",
"warmup/*.cpp",
],
exclude = ["main.cpp"],
),
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {
}

const char kCurveFsWarmupOpAdd[] = "add";
const char kCurveFsWarmupOpQuery[] = "query";
const char kCurveFsWarmupTypeList[] = "list";
const char kCurveFsWarmupTypeSingle[] = "single";

Expand All @@ -87,6 +88,9 @@ WarmupOpType GetWarmupOpType(const std::string& op) {
if (op == kCurveFsWarmupOpAdd) {
ret = WarmupOpType::kWarmupOpAdd;
}
if (op == kCurveFsWarmupOpQuery) {
ret = WarmupOpType::kWarmupOpQuery;
}
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";
enum class WarmupOpType {
kWarmupOpUnknown = 0,
kWarmupOpAdd = 1,
kWarmupOpQuery = 2,
};

WarmupOpType GetWarmupOpType(const std::string& op);
Expand Down
66 changes: 48 additions & 18 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "curvefs/src/client/metric/client_metric.h"
#include "curvefs/src/common/metric_utils.h"
#include "curvefs/src/common/dynamic_vlog.h"
#include "curvefs/src/client/warmup/warmup_manager.h"

using ::curve::common::Configuration;
using ::curvefs::client::CURVEFS_ERROR;
Expand Down Expand Up @@ -279,25 +280,42 @@ void FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) {
fuse_reply_attr(req, &attr, g_fuseClientOption->attrTimeOut);
}

int AddWarmupTask(const std::string& type, const std::string& path) {
int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key) {
int ret = 0;
switch (curvefs::client::common::GetWarmupType(type)) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
g_ClientInstance->PutWarmTask(path);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
g_ClientInstance->FetchDentryEnqueue(path);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
LOG(ERROR) << "not support warmup type, only support single/list";
ret = ERANGE;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
g_ClientInstance->PutWarmFilelistTask(key);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
g_ClientInstance->PutWarmFileTask(key);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
LOG(ERROR) << "not support warmup type, only support single/list";
ret = EOPNOTSUPP;
}
return ret;
}

int Warmup(const std::string& name, const std::string& value) {
void QueryWarmupTask(fuse_ino_t key, std::string *data) {
curvefs::client::warmup::WarmupProgress progress;
bool ret = g_ClientInstance->GetWarmupProgress(key, &progress);
if (!ret) {
*data = "finished";
} else {
*data = std::to_string(progress.GetFinished()) + "/" +
std::to_string(progress.GetTotal());
}
VLOG(9) << "Warmup [" << key << "]" << *data;
}

int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
// warmup
if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "warmup only support s3";
return EOPNOTSUPP;
}

std::vector<std::string> opTypePath;
curve::common::SplitString(value, "\n", &opTypePath);
if (opTypePath.size() != 3) {
Expand All @@ -307,9 +325,10 @@ int Warmup(const std::string& name, const std::string& value) {
int ret = 0;
switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd:
ret = AddWarmupTask(opTypePath[1], opTypePath[2]);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
ret = AddWarmupTask(
curvefs::client::common::GetWarmupType(opTypePath[1]), key);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
}
break;
default:
Expand All @@ -327,7 +346,7 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name,
<< " flags " << flags;
if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) {
// warmup
int code = Warmup(name, xattrValue);
int code = Warmup(ino, name, xattrValue);
fuse_reply_err(req, code);
} else {
// set xattr
Expand All @@ -340,7 +359,18 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name,
}

void FuseOpGetXattr(fuse_req_t req, fuse_ino_t ino, const char *name,
size_t size) {
size_t size) {
if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) {
// warmup
std::string data;
QueryWarmupTask(ino, &data);
if (size == 0) {
fuse_reply_xattr(req, data.length());
} else {
fuse_reply_buf(req, data.data(), data.length());
}
return;
}
InflightGuard guard(&g_clientOpMetric->opGetXattr.inflightOpNum);
LatencyUpdater updater(&g_clientOpMetric->opGetXattr.latency);
std::string buf;
Expand Down
210 changes: 12 additions & 198 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <list>
#include <algorithm>
#include <cstring>
#include <memory>
#include <string>
#include <vector>
#include <set>
Expand All @@ -38,6 +39,7 @@
#include "curvefs/src/client/fuse_common.h"
#include "curvefs/src/client/client_operator.h"
#include "curvefs/src/client/inode_wrapper.h"
#include "curvefs/src/client/warmup/warmup_manager.h"
#include "curvefs/src/client/xattr_manager.h"
#include "curvefs/src/common/define.h"
#include "src/common/net_common.h"
Expand Down Expand Up @@ -135,210 +137,20 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
if (ret3 != CURVEFS_ERROR::OK) {
return ret3;
}
warmUpFile_.exist = false;
bgCmdStop_.store(false, std::memory_order_release);
bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this);
taskFetchMetaPool_.Start(option_.warmupThreadsNum);
return ret3;
}

void FuseClient::WarmUpTask() {
// TODO(hzwuhongsong): Maybe we can start the warmup thread after mount
while (!mounted_.load(std::memory_order_acquire)) {
usleep(WARMUP_CHECKINTERVAL_US);
VLOG(6) << "wait mount success.";
continue;
}
while (!bgCmdStop_.load(std::memory_order_acquire)) {
std::list<std::string> readAheadPaths;
WaitWarmUp();
while (hasWarmTask()) {
std::string warmUpTask;
GetwarmTask(&warmUpTask);
VLOG(9) << "warmup task is: " << warmUpTask;
std::string pDelimiter = "/";
char* pToken = nullptr;
char* pSave = nullptr;
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
const_cast<char*>(pDelimiter.c_str()), &pSave);
if (nullptr == pToken) {
VLOG(6) << "warmUpTask nullptr";
continue;
}
Dentry dentry;
CURVEFS_ERROR ret = dentryManager_->GetDentry(
fsInfo_->rootinodeid(), pToken, &dentry);
if (ret != CURVEFS_ERROR::OK) {
LOG(WARNING) << "FetchDentry error: " << ret
<< ", name: " << warmUpTask;
return;
}
if (FsFileType::TYPE_S3 != dentry.type()) {
LOG(WARNING) << "not a file: " << warmUpTask
<< "type is: " << dentry.type();
return;
}
fuse_ino_t ino = dentry.inodeid();
std::shared_ptr<InodeWrapper> inodeWrapper;
ret = inodeManager_->GetInode(ino, inodeWrapper);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = "
<< ret << ", inodeid = " << ino;
return;
}
uint64_t len = inodeWrapper->GetLength();
VLOG(9) << "ino is: " << ino << ", len is: " << len;
WarmUpFileContext_t warmUpFile{ino, len, true};
SetWarmUpFile(warmUpFile);
}
}
}

void FuseClient::FetchDentryEnqueue(std::string file) {
VLOG(9) << "FetchDentryEnqueue start: " << file;
auto task = [this, file]() {
LookPath(file);
};
taskFetchMetaPool_.Enqueue(task);
}

void FuseClient::LookPath(std::string file) {
VLOG(9) << "LookPath start: " << file;
std::vector<std::string> splitPath;
// remove enter, newline, blank
std::string blanks("\r\n ");
file.erase(0, file.find_first_not_of(blanks));
file.erase(file.find_last_not_of(blanks) + 1);
if (file.empty()) {
VLOG(9) << "empty path";
return;
}
bool isRoot = false;
if (file == "/") {
splitPath.push_back(file);
isRoot = true;
} else {
splitStr(file, "/", &splitPath);
}
VLOG(6) << "splitPath size is: " << splitPath.size();
if (splitPath.size() == 1 && isRoot) {
VLOG(9) << "i am root";
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
return;
} else if (splitPath.size() == 1) {
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
<< ", path is: " << splitPath[0];
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
return;
} else if (splitPath.size() > 1) { // travel path
VLOG(9) << "traverse path start: " << splitPath.size();
std::string lastName = splitPath.back();
splitPath.pop_back();
fuse_ino_t ino = fsInfo_->rootinodeid();
for (auto iter : splitPath) {
VLOG(9) << "traverse path: " << iter
<< "ino is: " << ino;
Dentry dentry;
std::string pathName = iter;
CURVEFS_ERROR ret = dentryManager_->GetDentry(
ino, pathName, &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
<< ret << ", parent inodeid = " << ino
<< ", name = " << file;
}
VLOG(9) << "FetchDentry error: " << ret;
return;
}
ino = dentry.inodeid();
}
this->FetchDentry(ino, lastName);
VLOG(9) << "ino is: " << ino
<< "lastname is: " << lastName;
return;
} else {
VLOG(3) << "unknown path";
}
return;
}

void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
auto task = [this, ino]() {
// reverse from root
this->FetchChildDentry(ino);
};
taskFetchMetaPool_.Enqueue(task);
}

void FuseClient::FetchChildDentry(fuse_ino_t ino) {
VLOG(9) << "FetchChildDentry start: " << ino;
std::list<Dentry> dentryList;
auto limit = option_.listDentryLimit;
CURVEFS_ERROR ret = dentryManager_->ListDentry(
ino, &dentryList, limit);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
<< ", parent = " << ino;
return;
}
for (auto iter : dentryList) {
VLOG(9) << "FetchChildDentry: " << iter.name();
if (FsFileType::TYPE_S3 == iter.type()) {
std::unique_lock<std::mutex> lck(fetchMtx_);
readAheadFiles_.push_front(iter.inodeid());
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
FetchChildDentryEnqueue(iter.inodeid());
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
} else {
VLOG(9) << "unknown type";
}
}
return;
}

void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
VLOG(9) << "FetchDentry start: " << file
<< ", ino: " << ino;
Dentry dentry;
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
<< ", parent inodeid = " << ino
<< ", name = " << file;
}
VLOG(1) << "FetchDentry error: " << ret;
return;
if (warmupManager_ != nullptr) {
warmupManager_->Init(option);
warmupManager_->SetFsInfo(fsInfo_);
}
if (FsFileType::TYPE_S3 == dentry.type()) {
std::unique_lock<std::mutex> lck(fetchMtx_);
readAheadFiles_.push_front(dentry.inodeid());
return;
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
FetchChildDentryEnqueue(dentry.inodeid());
VLOG(9) << "FetchDentry: " << dentry.inodeid();
return;

} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
} else {
VLOG(3) << "unkown, file: " << file
<< ", ino: " << ino;
}
VLOG(9) << "FetchDentry end: " << file
<< ", ino: " << ino;
return;
return ret3;
}

void FuseClient::UnInit() {
bgCmdStop_.store(true, std::memory_order_release);
WarmUpRun();
if (bgCmdTaskThread_.joinable()) {
bgCmdTaskThread_.join();
if (warmupManager_ != nullptr) {
warmupManager_->UnInit();
}
taskFetchMetaPool_.Stop();

delete mdsBase_;
mdsBase_ = nullptr;
}
Expand Down Expand Up @@ -404,7 +216,9 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
}

init_ = true;
mounted_.store(true, std::memory_order_release);
if (warmupManager_ != nullptr) {
warmupManager_->SetMounted(true);
}
return CURVEFS_ERROR::OK;
}

Expand Down
Loading

0 comments on commit ff4e7b2

Please sign in to comment.