diff --git a/curvefs/src/client/BUILD b/curvefs/src/client/BUILD index ca0964ef63..9bc190a388 100644 --- a/curvefs/src/client/BUILD +++ b/curvefs/src/client/BUILD @@ -43,6 +43,8 @@ cc_library( "s3/*.h", "volume/*.cpp", "volume/*.h", + "warmup/*.h", + "warmup/*.cpp", ], exclude = ["main.cpp"], ), diff --git a/curvefs/src/client/common/common.cpp b/curvefs/src/client/common/common.cpp index e99f1bf9b9..7cf6bfeb21 100644 --- a/curvefs/src/client/common/common.cpp +++ b/curvefs/src/client/common/common.cpp @@ -79,6 +79,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) { } const char kCurveFsWarmupOpAdd[] = "add"; +const char kCurveFsWarmupOpQuery[] = "query"; const char kCurveFsWarmupTypeList[] = "list"; const char kCurveFsWarmupTypeSingle[] = "single"; @@ -87,6 +88,9 @@ WarmupOpType GetWarmupOpType(const std::string& op) { if (op == kCurveFsWarmupOpAdd) { ret = WarmupOpType::kWarmupOpAdd; } + if (op == kCurveFsWarmupOpQuery) { + ret = WarmupOpType::kWarmupOpQuery; + } return ret; } diff --git a/curvefs/src/client/common/common.h b/curvefs/src/client/common/common.h index 836367a6eb..e89f50fd9a 100644 --- a/curvefs/src/client/common/common.h +++ b/curvefs/src/client/common/common.h @@ -72,6 +72,7 @@ const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op"; enum class WarmupOpType { kWarmupOpUnknown = 0, kWarmupOpAdd = 1, + kWarmupOpQuery = 2, }; WarmupOpType GetWarmupOpType(const std::string& op); diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index a9e5afcefa..8480ce3faa 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -42,6 +42,7 @@ #include "curvefs/src/client/metric/client_metric.h" #include "curvefs/src/common/metric_utils.h" #include "curvefs/src/common/dynamic_vlog.h" +#include "curvefs/src/client/warmup/warmup_manager.h" using ::curve::common::Configuration; using ::curvefs::client::CURVEFS_ERROR; @@ -279,25 +280,42 @@ void FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) { fuse_reply_attr(req, &attr, g_fuseClientOption->attrTimeOut); } -int AddWarmupTask(const std::string& type, const std::string& path) { +int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key) { int ret = 0; - switch (curvefs::client::common::GetWarmupType(type)) { - case curvefs::client::common::WarmupType::kWarmupTypeList: - g_ClientInstance->PutWarmTask(path); - break; - case curvefs::client::common::WarmupType::kWarmupTypeSingle: - g_ClientInstance->FetchDentryEnqueue(path); - break; - default: - // not support add warmup type (warmup single file/dir or filelist) - LOG(ERROR) << "not support warmup type, only support single/list"; - ret = ERANGE; + switch (type) { + case curvefs::client::common::WarmupType::kWarmupTypeList: + g_ClientInstance->PutWarmFilelistTask(key); + break; + case curvefs::client::common::WarmupType::kWarmupTypeSingle: + g_ClientInstance->PutWarmFileTask(key); + break; + default: + // not support add warmup type (warmup single file/dir or filelist) + LOG(ERROR) << "not support warmup type, only support single/list"; + ret = EOPNOTSUPP; } return ret; } -int Warmup(const std::string& name, const std::string& value) { +void QueryWarmupTask(fuse_ino_t key, std::string *data) { + curvefs::client::warmup::WarmupProgress progress; + bool ret = g_ClientInstance->GetWarmupProgress(key, &progress); + if (!ret) { + *data = "finished"; + } else { + *data = std::to_string(progress.GetFinished()) + "/" + + std::to_string(progress.GetTotal()); + } + VLOG(9) << "Warmup [" << key << "]" << *data; +} + +int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) { // warmup + if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) { + LOG(ERROR) << "warmup only support s3"; + return EOPNOTSUPP; + } + std::vector opTypePath; curve::common::SplitString(value, "\n", &opTypePath); if (opTypePath.size() != 3) { @@ -307,9 +325,10 @@ int Warmup(const std::string& name, const std::string& value) { int ret = 0; switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) { case curvefs::client::common::WarmupOpType::kWarmupOpAdd: - ret = AddWarmupTask(opTypePath[1], opTypePath[2]); - if (ret != 0) { - LOG(ERROR) << name << " has invalid xattr value " << value; + ret = AddWarmupTask( + curvefs::client::common::GetWarmupType(opTypePath[1]), key); + if (ret != 0) { + LOG(ERROR) << name << " has invalid xattr value " << value; } break; default: @@ -327,7 +346,7 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name, << " flags " << flags; if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) { // warmup - int code = Warmup(name, xattrValue); + int code = Warmup(ino, name, xattrValue); fuse_reply_err(req, code); } else { // set xattr @@ -340,7 +359,18 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name, } void FuseOpGetXattr(fuse_req_t req, fuse_ino_t ino, const char *name, - size_t size) { + size_t size) { + if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) { + // warmup + std::string data; + QueryWarmupTask(ino, &data); + if (size == 0) { + fuse_reply_xattr(req, data.length()); + } else { + fuse_reply_buf(req, data.data(), data.length()); + } + return; + } InflightGuard guard(&g_clientOpMetric->opGetXattr.inflightOpNum); LatencyUpdater updater(&g_clientOpMetric->opGetXattr.latency); std::string buf; diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp index fbcd17bc66..f6146ad62e 100644 --- a/curvefs/src/client/fuse_client.cpp +++ b/curvefs/src/client/fuse_client.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +39,7 @@ #include "curvefs/src/client/fuse_common.h" #include "curvefs/src/client/client_operator.h" #include "curvefs/src/client/inode_wrapper.h" +#include "curvefs/src/client/warmup/warmup_manager.h" #include "curvefs/src/client/xattr_manager.h" #include "curvefs/src/common/define.h" #include "src/common/net_common.h" @@ -135,210 +137,20 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) { if (ret3 != CURVEFS_ERROR::OK) { return ret3; } - warmUpFile_.exist = false; - bgCmdStop_.store(false, std::memory_order_release); - bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this); - taskFetchMetaPool_.Start(option_.warmupThreadsNum); - return ret3; -} - -void FuseClient::WarmUpTask() { - // TODO(hzwuhongsong): Maybe we can start the warmup thread after mount - while (!mounted_.load(std::memory_order_acquire)) { - usleep(WARMUP_CHECKINTERVAL_US); - VLOG(6) << "wait mount success."; - continue; - } - while (!bgCmdStop_.load(std::memory_order_acquire)) { - std::list readAheadPaths; - WaitWarmUp(); - while (hasWarmTask()) { - std::string warmUpTask; - GetwarmTask(&warmUpTask); - VLOG(9) << "warmup task is: " << warmUpTask; - std::string pDelimiter = "/"; - char* pToken = nullptr; - char* pSave = nullptr; - pToken = strtok_r(const_cast(warmUpTask.c_str()), - const_cast(pDelimiter.c_str()), &pSave); - if (nullptr == pToken) { - VLOG(6) << "warmUpTask nullptr"; - continue; - } - Dentry dentry; - CURVEFS_ERROR ret = dentryManager_->GetDentry( - fsInfo_->rootinodeid(), pToken, &dentry); - if (ret != CURVEFS_ERROR::OK) { - LOG(WARNING) << "FetchDentry error: " << ret - << ", name: " << warmUpTask; - return; - } - if (FsFileType::TYPE_S3 != dentry.type()) { - LOG(WARNING) << "not a file: " << warmUpTask - << "type is: " << dentry.type(); - return; - } - fuse_ino_t ino = dentry.inodeid(); - std::shared_ptr inodeWrapper; - ret = inodeManager_->GetInode(ino, inodeWrapper); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "inodeManager get inode fail, ret = " - << ret << ", inodeid = " << ino; - return; - } - uint64_t len = inodeWrapper->GetLength(); - VLOG(9) << "ino is: " << ino << ", len is: " << len; - WarmUpFileContext_t warmUpFile{ino, len, true}; - SetWarmUpFile(warmUpFile); - } - } -} - -void FuseClient::FetchDentryEnqueue(std::string file) { - VLOG(9) << "FetchDentryEnqueue start: " << file; - auto task = [this, file]() { - LookPath(file); - }; - taskFetchMetaPool_.Enqueue(task); -} - -void FuseClient::LookPath(std::string file) { - VLOG(9) << "LookPath start: " << file; - std::vector splitPath; - // remove enter, newline, blank - std::string blanks("\r\n "); - file.erase(0, file.find_first_not_of(blanks)); - file.erase(file.find_last_not_of(blanks) + 1); - if (file.empty()) { - VLOG(9) << "empty path"; - return; - } - bool isRoot = false; - if (file == "/") { - splitPath.push_back(file); - isRoot = true; - } else { - splitStr(file, "/", &splitPath); - } - VLOG(6) << "splitPath size is: " << splitPath.size(); - if (splitPath.size() == 1 && isRoot) { - VLOG(9) << "i am root"; - FetchChildDentryEnqueue(fsInfo_->rootinodeid()); - return; - } else if (splitPath.size() == 1) { - VLOG(9) << "parent is root: " << fsInfo_->rootinodeid() - << ", path is: " << splitPath[0]; - this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]); - return; - } else if (splitPath.size() > 1) { // travel path - VLOG(9) << "traverse path start: " << splitPath.size(); - std::string lastName = splitPath.back(); - splitPath.pop_back(); - fuse_ino_t ino = fsInfo_->rootinodeid(); - for (auto iter : splitPath) { - VLOG(9) << "traverse path: " << iter - << "ino is: " << ino; - Dentry dentry; - std::string pathName = iter; - CURVEFS_ERROR ret = dentryManager_->GetDentry( - ino, pathName, &dentry); - if (ret != CURVEFS_ERROR::OK) { - if (ret != CURVEFS_ERROR::NOTEXIST) { - LOG(WARNING) << "dentryManager_ get dentry fail, ret = " - << ret << ", parent inodeid = " << ino - << ", name = " << file; - } - VLOG(9) << "FetchDentry error: " << ret; - return; - } - ino = dentry.inodeid(); - } - this->FetchDentry(ino, lastName); - VLOG(9) << "ino is: " << ino - << "lastname is: " << lastName; - return; - } else { - VLOG(3) << "unknown path"; - } - return; -} - -void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) { - auto task = [this, ino]() { - // reverse from root - this->FetchChildDentry(ino); - }; - taskFetchMetaPool_.Enqueue(task); -} - -void FuseClient::FetchChildDentry(fuse_ino_t ino) { - VLOG(9) << "FetchChildDentry start: " << ino; - std::list dentryList; - auto limit = option_.listDentryLimit; - CURVEFS_ERROR ret = dentryManager_->ListDentry( - ino, &dentryList, limit); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret - << ", parent = " << ino; - return; - } - for (auto iter : dentryList) { - VLOG(9) << "FetchChildDentry: " << iter.name(); - if (FsFileType::TYPE_S3 == iter.type()) { - std::unique_lock lck(fetchMtx_); - readAheadFiles_.push_front(iter.inodeid()); - VLOG(9) << "FetchChildDentry: " << iter.inodeid();; - } else if (FsFileType::TYPE_DIRECTORY == iter.type()) { - FetchChildDentryEnqueue(iter.inodeid()); - VLOG(9) << "FetchChildDentry: " << iter.inodeid(); - } else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo - } else { - VLOG(9) << "unknown type"; - } - } - return; -} -void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) { - VLOG(9) << "FetchDentry start: " << file - << ", ino: " << ino; - Dentry dentry; - CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry); - if (ret != CURVEFS_ERROR::OK) { - if (ret != CURVEFS_ERROR::NOTEXIST) { - LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret - << ", parent inodeid = " << ino - << ", name = " << file; - } - VLOG(1) << "FetchDentry error: " << ret; - return; + if (warmupManager_ != nullptr) { + warmupManager_->Init(option); + warmupManager_->SetFsInfo(fsInfo_); } - if (FsFileType::TYPE_S3 == dentry.type()) { - std::unique_lock lck(fetchMtx_); - readAheadFiles_.push_front(dentry.inodeid()); - return; - } else if (FsFileType::TYPE_DIRECTORY == dentry.type()) { - FetchChildDentryEnqueue(dentry.inodeid()); - VLOG(9) << "FetchDentry: " << dentry.inodeid(); - return; - } else if (FsFileType::TYPE_SYM_LINK == dentry.type()) { - } else { - VLOG(3) << "unkown, file: " << file - << ", ino: " << ino; - } - VLOG(9) << "FetchDentry end: " << file - << ", ino: " << ino; - return; + return ret3; } void FuseClient::UnInit() { - bgCmdStop_.store(true, std::memory_order_release); - WarmUpRun(); - if (bgCmdTaskThread_.joinable()) { - bgCmdTaskThread_.join(); + if (warmupManager_ != nullptr) { + warmupManager_->UnInit(); } - taskFetchMetaPool_.Stop(); + delete mdsBase_; mdsBase_ = nullptr; } @@ -404,7 +216,9 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata, } init_ = true; - mounted_.store(true, std::memory_order_release); + if (warmupManager_ != nullptr) { + warmupManager_->SetMounted(true); + } return CURVEFS_ERROR::OK; } diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h index 47775ff327..217f7d9ebb 100644 --- a/curvefs/src/client/fuse_client.h +++ b/curvefs/src/client/fuse_client.h @@ -52,9 +52,9 @@ #include "curvefs/src/client/client_operator.h" #include "curvefs/src/client/lease/lease_excutor.h" #include "curvefs/src/client/xattr_manager.h" +#include "curvefs/src/client/warmup/warmup_manager.h" #define DirectIOAlignment 512 -#define WARMUP_CHECKINTERVAL_US 1000*1000 using ::curve::common::Atomic; using ::curve::common::InterruptibleSleeper; @@ -67,6 +67,10 @@ using ::curvefs::client::metric::FSMetric; namespace curvefs { namespace client { +namespace warmup { +class WarmupManager; +} + using common::FuseClientOption; using rpcclient::MDSBaseClient; using rpcclient::MdsClient; @@ -79,11 +83,7 @@ using curvefs::common::is_aligned; const uint32_t kMaxHostNameLength = 255u; using mds::Mountpoint; -typedef struct WarmUpFileContext { - fuse_ino_t inode; - uint64_t fileLen; - bool exist; -} WarmUpFileContext_t; + class FuseClient { public: FuseClient() @@ -96,15 +96,16 @@ class FuseClient { mdsBase_(nullptr), isStop_(true), init_(false), - mounted_(false), - enableSumInDir_(false) {} + enableSumInDir_(false), + warmupManager_(nullptr) {} virtual ~FuseClient() {} FuseClient(const std::shared_ptr &mdsClient, const std::shared_ptr &metaClient, const std::shared_ptr &inodeManager, - const std::shared_ptr &dentryManager) + const std::shared_ptr &dentryManager, + const std::shared_ptr &warmupManager) : mdsClient_(mdsClient), metaClient_(metaClient), inodeManager_(inodeManager), @@ -114,8 +115,8 @@ class FuseClient { mdsBase_(nullptr), isStop_(true), init_(false), - mounted_(false), - enableSumInDir_(false) {} + enableSumInDir_(false), + warmupManager_(warmupManager) {} virtual CURVEFS_ERROR Init(const FuseClientOption &option); @@ -246,12 +247,9 @@ class FuseClient { } void SetMounted(bool mounted) { - mounted_ = mounted; - } - - TaskThreadPool & - GetTaskFetchPool() { - return taskFetchMetaPool_; + if (warmupManager_ != nullptr) { + warmupManager_->SetMounted(mounted); + } } std::shared_ptr GetFsInfo() { @@ -268,33 +266,24 @@ class FuseClient { void SetEnableSumInDir(bool enable) { enableSumInDir_ = enable; } - void GetReadAheadFiles(std::list* readAheadFiles) { - std::unique_lock lck(fetchMtx_); - *readAheadFiles = std::move(readAheadFiles_); - } - void GetWarmUpFile(WarmUpFileContext_t* warmUpFile) { - std::unique_lock lck(warmUpFileMtx_); - *warmUpFile = std::move(warmUpFile_); - warmUpFile_.exist = false; - return; - } - void SetWarmUpFile(WarmUpFileContext_t warmUpFile) { - std::unique_lock lck(warmUpFileMtx_); - warmUpFile_ = warmUpFile; - warmUpFile_.exist = true; - } - bool hasWarmUpTask() { - std::unique_lock lck(warmUpFileMtx_); - return warmUpFile_.exist; + void PutWarmFilelistTask(fuse_ino_t key) { + if (fsInfo_->fstype() == FSType::TYPE_S3) { + warmupManager_->AddWarmupFilelist(key); + } // only support s3 } - void FetchDentryEnqueue(std::string file); + void PutWarmFileTask(fuse_ino_t key) { + if (fsInfo_->fstype() == FSType::TYPE_S3) { + warmupManager_->AddWarmupFile(key); + } // only support s3 + } - void PutWarmTask(const std::string& warmUpTask) { - std::unique_lock lck(warmUpTaskMtx_); - warmUpTasks_.push_back(warmUpTask); - WarmUpRun(); + bool GetWarmupProgress(fuse_ino_t key, warmup::WarmupProgress *progress) { + if (fsInfo_->fstype() == FSType::TYPE_S3) { + return warmupManager_->QueryWarmupProgress(key, progress); + } + return false; } protected: @@ -339,30 +328,6 @@ class FuseClient { return 0; } - void splitStr(const std::string& srcStr, const std::string& delimiter, - std::vector* splitPath) { - char* pToken = nullptr; - char* pSave = nullptr; - pToken = strtok_r(const_cast(srcStr.c_str()), - const_cast(delimiter.c_str()), &pSave); - if (nullptr == pToken) { - VLOG(6) << "del lookpath end"; - return; - } - splitPath->push_back(pToken); - while (true) { - pToken = strtok_r(NULL, const_cast( - delimiter.c_str()), &pSave); - if (nullptr == pToken) { - VLOG(6) << "del lookpath end"; - break; - } - VLOG(9) << "del pToken is:" << pToken - << "pSave:" << pSave; - splitPath->push_back(pToken); - } - } - private: virtual CURVEFS_ERROR Truncate(InodeWrapper* inode, uint64_t length) = 0; @@ -371,19 +336,6 @@ class FuseClient { CURVEFS_ERROR UpdateParentMCTimeAndNlink( fuse_ino_t parent, FsFileType type, NlinkChange nlink); - void WarmUpTask(); - - void WarmUpRun() { - std::lock_guard lk(mtx_); - runned_ = true; - cond_.notify_one(); - } - void WaitWarmUp() { - std::unique_lock lk(mtx_); - cond_.wait(lk, [this]() { return runned_; }); - runned_ = false; - } - std::string GenerateNewRecycleName(fuse_ino_t ino, fuse_ino_t parent, const char* name) { std::string newName(name); @@ -396,10 +348,6 @@ class FuseClient { return newName; } - std::mutex mtx_; - std::condition_variable cond_; - bool runned_ = false; - protected: // mds client std::shared_ptr mdsClient_; @@ -429,8 +377,6 @@ class FuseClient { // init flags bool init_; - std::atomic mounted_; - // enable record summary info in dir inode xattr bool enableSumInDir_; @@ -438,46 +384,15 @@ class FuseClient { Mountpoint mountpoint_; + // warmup manager + std::shared_ptr warmupManager_; + private: MDSBaseClient* mdsBase_; Atomic isStop_; curve::common::Mutex renameMutex_; - - Thread bgCmdTaskThread_; - std::atomic bgCmdStop_; - std::mutex cmdMtx_; - - void FetchChildDentryEnqueue(fuse_ino_t ino); - void FetchChildDentry(fuse_ino_t ino); - void FetchDentry(fuse_ino_t ino, std::string file); - void LookPath(std::string file); - TaskThreadPool - taskFetchMetaPool_; - - // need warmup files - std::list readAheadFiles_; - std::mutex fetchMtx_; - - // one warmup file provided by the user - WarmUpFileContext_t warmUpFile_; - std::mutex warmUpFileMtx_; - - std::list warmUpTasks_; // todo: need size control ? - std::mutex warmUpTaskMtx_; - - void GetwarmTask(std::string *warmUpTask) { - std::unique_lock lck(warmUpTaskMtx_); - if (warmUpTasks_.empty()) - return; - *warmUpTask = std::move(warmUpTasks_.front()); - warmUpTasks_.pop_front(); - } - bool hasWarmTask() { - std::unique_lock lck(warmUpTaskMtx_); - return !warmUpTasks_.empty(); - } }; } // namespace client diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index 08ca03bbda..93d5ca81b8 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -48,13 +48,11 @@ using curvefs::mds::topology::MemcacheServerInfo; CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { FuseClientOption opt(option); - initbgFetchThread_ = false; CURVEFS_ERROR ret = FuseClient::Init(opt); if (ret != CURVEFS_ERROR::OK) { return ret; } - downloadMaxRetryTimes_ = option.downloadMaxRetryTimes; // init kvcache if (FLAGS_supportKVcache && !InitKVCache(option.kvClientManagerOpt)) { @@ -92,11 +90,6 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { inodeManager_, mdsClient_, fsCacheManager, nullptr, true); } - isWarmUping_.store(false); - bgFetchStop_.store(false, std::memory_order_release); - bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this); - initbgFetchThread_ = true; - GetTaskFetchPool(); return ret; } @@ -129,294 +122,7 @@ bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) { return true; } -void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile, - std::vector& warmUpFilelist) { - struct fuse_file_info fi{}; - fi.flags &= ~O_DIRECT; - size_t rSize = 0; - std::unique_ptr data(new char[warmUpFile.fileLen+1]); - std::memset(data.get(), 0, warmUpFile.fileLen); - data[warmUpFile.fileLen] = '\n'; - FuseOpRead(nullptr, warmUpFile.inode, - warmUpFile.fileLen, 0, &fi, data.get(), &rSize); - std::string file = data.get(); - VLOG(9) << "file is: " << file; - // remove enter, newline, blank - std::string blanks("\r\n "); - file.erase(0, file.find_first_not_of(blanks)); - file.erase(file.find_last_not_of(blanks) + 1); - VLOG(9) << "after del file is: " << file; - splitStr(file, "\n", &warmUpFilelist); -} - -void FuseS3Client::BackGroundFetch() { - while (!bgFetchStop_.load(std::memory_order_acquire)) { - usleep(WARMUP_CHECKINTERVAL_US); - if (hasWarmUpTask()) { // new warmup task - WarmUpFileContext_t warmUpFile; - GetWarmUpFile(&warmUpFile); - VLOG(9) << " len is: " << warmUpFile.fileLen - << "ino is: " << warmUpFile.inode; - - std::vector warmUpFilelist; - GetWarmUpFileList(warmUpFile, warmUpFilelist); - for (auto filePath : warmUpFilelist) { - FetchDentryEnqueue(filePath); - } - } - { // file need warmup - std::list readAheadFiles; - GetReadAheadFiles(&readAheadFiles); - if (!readAheadFiles.empty()) { - LOG(INFO) << "num of files is need loaded is: " - << readAheadFiles.size(); - for (auto iter : readAheadFiles) { - VLOG(9) << "BackGroundFetch: " << iter; - fetchDataEnqueue(iter); - } - } - } - { // objs will be downloaded - { - std::unique_lock lck(warmupObjsMtx_); - if (needWarmupObjs_.empty()) { - continue; - } - } - if (isWarmUping_.exchange(true)) { - continue; - } - std::thread downloadThread = - std::thread(&FuseS3Client::WarmUpAllObjs, this); - downloadThread.detach(); - } - } - return; -} - -void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) { - VLOG(9) << "fetchDataEnqueue start: " << ino; - auto task = [this, ino]() { - std::shared_ptr inodeWrapper; - CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); - if (ret != CURVEFS_ERROR::OK) { - LOG(ERROR) << "inodeManager get inode fail, ret = " << ret - << ", inodeid = " << ino; - return; - } - google::protobuf::Map s3ChunkInfoMap; - { - ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); - s3ChunkInfoMap = *inodeWrapper->GetChunkInfoMap(); - } - if (s3ChunkInfoMap.empty()) { - return; - } - travelChunks(ino, s3ChunkInfoMap); - }; - GetTaskFetchPool().Enqueue(task); -} - -// travel and download all objs belong to the chunk -void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo) { - uint64_t blockSize = s3Adaptor_->GetBlockSize(); - uint64_t chunkSize = s3Adaptor_->GetChunkSize(); - uint64_t offset, len, chunkid, compaction; - for (size_t i = 0; i < chunkInfo.s3chunks_size(); i++) { - auto chunkinfo = chunkInfo.mutable_s3chunks(i); - auto fsId = fsInfo_->fsid(); - chunkid = chunkinfo->chunkid(); - compaction = chunkinfo->compaction(); - offset = chunkinfo->offset(); - len = chunkinfo->len(); - // the offset in the chunk - uint64_t chunkPos = offset % chunkSize; - // the offset in the block - uint64_t blockPos = chunkPos % blockSize; - // the first blockIndex - uint64_t blockIndexBegin = chunkPos / blockSize; - - if (len < blockSize) { // just one block - auto objectName = curvefs::common::s3util::GenObjName( - chunkid, blockIndexBegin, compaction, fsId, ino); - std::unique_lock lck(warmupObjsMtx_); - needWarmupObjs_.push_back(std::make_pair(objectName, len)); - } else { - // the offset in the block - uint64_t blockPos = chunkPos % blockSize; - - // firstly, let's get the size in the first block - // then, subtract the length in the first block - // to obtain the remaining length - // lastly, We need to judge the last block is full or not - uint64_t firstBlockSize = (blockPos != 0) ? - blockSize - blockPos : blockSize; - uint64_t leftSize = len - firstBlockSize; - uint32_t blockCounts = (leftSize % blockSize == 0) ? - (leftSize / blockSize + 1) : (leftSize / blockSize + 1 + 1); - // so we can get the last blockIndex - // because the bolck Index is cumulative - uint64_t blockIndexEnd = blockIndexBegin + blockCounts - 1; - - // the size of the last block - uint64_t lastBlockSize = leftSize % blockSize; - // whether the first block or the last block is full or not - bool firstBlockFull = (blockPos == 0) ? true : false; - bool lastBlockFull = (lastBlockSize == 0) ? true : false; - // the start and end block Index that need travel - uint64_t travelStartIndex, travelEndIndex; - // if the block is full, the size is needed download - // of the obj is blockSize. Otherwise, the value is special. - if (!firstBlockFull) { - travelStartIndex = blockIndexBegin + 1; - auto objectName = curvefs::common::s3util::GenObjName( - chunkid, blockIndexBegin, compaction, fsId, ino); - std::unique_lock lck(warmupObjsMtx_); - needWarmupObjs_.push_back(std::make_pair( - objectName, firstBlockSize)); - } else { - travelStartIndex = blockIndexBegin; - } - if (!lastBlockFull) { - // block index is greater than or equal to 0 - travelEndIndex = (blockIndexEnd == blockIndexBegin) ? - blockIndexEnd : blockIndexEnd - 1; - auto objectName = curvefs::common::s3util::GenObjName( - chunkid, blockIndexEnd, compaction, fsId, ino); - // there is no need to care about the order - // in which objects are downloaded - std::unique_lock lck(warmupObjsMtx_); - needWarmupObjs_.push_back( - std::make_pair(objectName, lastBlockSize)); - } else { - travelEndIndex = blockIndexEnd; - } - VLOG(9) << "travel obj, ino: " << ino - << ", chunkid: " << chunkid - << ", blockCounts: " << blockCounts - << ", compaction: " << compaction - << ", blockSize: " << blockSize - << ", chunkSize: " << chunkSize - << ", offset: " << offset - << ", blockIndexBegin: " << blockIndexBegin - << ", blockIndexEnd: " << blockIndexEnd - << ", len: " << len - << ", firstBlockSize: " << firstBlockSize - << ", lastBlockSize: " << lastBlockSize - << ", blockPos: " << blockPos - << ", chunkPos: " << chunkPos; - for (auto blockIndex = travelStartIndex; - blockIndex <= travelEndIndex ; blockIndex++) { - auto objectName = curvefs::common::s3util::GenObjName( - chunkid, blockIndex, compaction, fsId, ino); - { - std::unique_lock lck(warmupObjsMtx_); - needWarmupObjs_.push_back( - std::make_pair(objectName, blockSize)); - } - } - } - } -} - -// TODO(hzwuhongsong): These logics are very similar to other place, -// try to merge it -void FuseS3Client::WarmUpAllObjs() { - std::list> needWarmupObjs; - { - std::unique_lock lck(warmupObjsMtx_); - LOG(INFO) << "num of objs need loaded is: " << needWarmupObjs_.size(); - needWarmupObjs = std::move(needWarmupObjs_); - } - std::atomic pendingReq(0); - curve::common::CountDownEvent cond(1); - // callback function - GetObjectAsyncCallBack cb = - [&](const S3Adapter *adapter, - const std::shared_ptr &context) { - if (bgFetchStop_.load()) { - LOG(INFO) << "need stop warmup"; - cond.Signal(); - return; - } - if (context->retCode == 0) { - VLOG(9) << "Get Object success: " << context->key; - int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect( - context->key, context->buf, context->len); - if (ret < 0) { - LOG_EVERY_SECOND(INFO) << - "write read directly failed, key: " << context->key; - } - if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { - VLOG(6) << "pendingReq is over"; - cond.Signal(); - } - delete []context->buf; - return; - } - if (++context->retry >= downloadMaxRetryTimes_) { - if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { - VLOG(6) << "pendingReq is over"; - cond.Signal(); - } - LOG(WARNING) << "Up to max retry times, " - << "download object failed, key: " - << context->key; - delete []context->buf; - return; - } - - LOG(WARNING) << "Get Object failed, key: " << context->key - << ", offset: " << context->offset; - s3Adaptor_->GetS3Client()->DownloadAsync(context); - }; - - pendingReq.fetch_add(needWarmupObjs.size(), std::memory_order_seq_cst); - if (pendingReq.load()) { - VLOG(9) << "wait for pendingReq" << pendingReq.load(); - for (auto iter : needWarmupObjs) { - VLOG(9) << "download start: " << iter.first; - std::string name = iter.first; - uint64_t readLen = iter.second; - if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { - pendingReq.fetch_sub(1); - continue; - } - char *cacheS3 = new char[readLen]; - memset(cacheS3, 0, readLen); - auto context = std::make_shared(); - context->key = name; - context->buf = cacheS3; - context->offset = 0; - context->len = readLen; - context->cb = cb; - context->retry = 0; - s3Adaptor_->GetS3Client()->DownloadAsync(context); - } - if (pendingReq.load()) - cond.Wait(); - } - isWarmUping_.exchange(false); - LOG(INFO) << "num of objs is loaded over "; -} - -void FuseS3Client::travelChunks( - fuse_ino_t ino, - const google::protobuf::Map& s3ChunkInfoMap) { - VLOG(9) << "travel chunk start: " << ino - << ", size: " << s3ChunkInfoMap.size(); - for (auto const& iter : s3ChunkInfoMap) { - VLOG(9) << "travel chunk: " << iter.first; - travelChunk(ino, iter.second); - } - VLOG(9) << "travel chunks end"; -} - void FuseS3Client::UnInit() { - bgFetchStop_.store(true, std::memory_order_release); - if (initbgFetchThread_) { - bgFetchThread_.join(); - } FuseClient::UnInit(); s3Adaptor_->Stop(); curve::common::S3Adapter::Shutdown(); diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index 66f2173e4e..1d1ec6062c 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -32,6 +32,7 @@ #include "curvefs/src/client/fuse_client.h" #include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "curvefs/src/client/warmup/warmup_manager.h" #include "src/common/s3_adapter.h" namespace curvefs { @@ -39,18 +40,34 @@ namespace client { using curve::common::GetObjectAsyncContext; using curve::common::GetObjectAsyncCallBack; +namespace warmup { +class WarmupManager; +class WarmupManagerS3Impl; +} // namespace warmup + class FuseS3Client : public FuseClient { public: FuseS3Client() - : FuseClient(), s3Adaptor_(std::make_shared()) {} + : FuseClient(), s3Adaptor_(std::make_shared()) { + auto readFunc = [this](fuse_req_t req, fuse_ino_t ino, size_t size, + off_t off, struct fuse_file_info *fi, + char *buffer, size_t *rSize) { + return FuseOpRead(req, ino, size, off, fi, buffer, rSize); + }; + warmupManager_ = std::make_shared( + metaClient_, inodeManager_, dentryManager_, fsInfo_, readFunc, + s3Adaptor_); + } FuseS3Client(const std::shared_ptr &mdsClient, const std::shared_ptr &metaClient, const std::shared_ptr &inodeManager, const std::shared_ptr &dentryManager, - const std::shared_ptr &s3Adaptor) - : FuseClient(mdsClient, metaClient, inodeManager, dentryManager), + const std::shared_ptr &s3Adaptor, + const std::shared_ptr &warmupManager) + : FuseClient(mdsClient, metaClient, inodeManager, dentryManager, + warmupManager), s3Adaptor_(s3Adaptor) {} CURVEFS_ERROR Init(const FuseClientOption &option) override; @@ -97,33 +114,10 @@ class FuseS3Client : public FuseClient { CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override; void FlushData() override; - // get the warmUp filelist - void GetWarmUpFileList(const WarmUpFileContext_t&, - std::vector&); - void BackGroundFetch(); - // put the file needed warmup to queue, - // then can downlaod the objs belong to it - void fetchDataEnqueue(fuse_ino_t ino); - // travel all chunks - void travelChunks( - fuse_ino_t ino, - const google::protobuf::Map& s3ChunkInfoMap); - // travel and download all objs belong to the chunk - void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo); - // warmup all the prefetchObjs - void WarmUpAllObjs(); private: // s3 adaptor std::shared_ptr s3Adaptor_; - - bool initbgFetchThread_; - Thread bgFetchThread_; - std::atomic bgFetchStop_; - uint32_t downloadMaxRetryTimes_; - std::mutex warmupObjsMtx_; - std::atomic isWarmUping_; - std::list> needWarmupObjs_; }; diff --git a/curvefs/src/client/fuse_volume_client.h b/curvefs/src/client/fuse_volume_client.h index 3c01a6025e..f702c766b4 100644 --- a/curvefs/src/client/fuse_volume_client.h +++ b/curvefs/src/client/fuse_volume_client.h @@ -52,7 +52,8 @@ class FuseVolumeClient : public FuseClient { const std::shared_ptr &inodeManager, const std::shared_ptr &dentryManager, const std::shared_ptr &blockDeviceClient) - : FuseClient(mdsClient, metaClient, inodeManager, dentryManager), + : FuseClient(mdsClient, metaClient, inodeManager, dentryManager, + nullptr), blockDeviceClient_(blockDeviceClient) {} CURVEFS_ERROR Init(const FuseClientOption &option) override; diff --git a/curvefs/src/client/warmup/warmup_manager.cpp b/curvefs/src/client/warmup/warmup_manager.cpp new file mode 100644 index 0000000000..97db9a375f --- /dev/null +++ b/curvefs/src/client/warmup/warmup_manager.cpp @@ -0,0 +1,686 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: 2023-01-31 + * Author: chengyi01 + */ + +#include "curvefs/src/client/warmup/warmup_manager.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "curvefs/src/client/inode_wrapper.h" +#include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "curvefs/src/common/s3util.h" +#include "src/common/concurrent/concurrent.h" +#include "src/common/string_util.h" + + +namespace curvefs { +namespace client { +namespace warmup { + +using curve::common::WriteLockGuard; + +#define WARMUP_CHECKINTERVAL_US (1000 * 1000) + +void WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key) { + if (!mounted_.load(std::memory_order_acquire)) { + LOG(ERROR) << "not mounted"; + } + // add warmup Progress + if (!AddWarmupProcess(key)) { + return; + } + WriteLockGuard lock(warmupFilelistDequeMutex_); + auto iter = LockedFindKeyWarmupFilelist(key); + if (iter == warmupFilelistDeque_.end()) { + std::shared_ptr inodeWrapper; + CURVEFS_ERROR ret = inodeManager_->GetInode(key, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "inodeManager get inode fail, ret = " << ret + << ", inodeid = " << key; + return; + } + uint64_t len = inodeWrapper->GetLength(); + VLOG(9) << "ino is: " << key << ", len is: " << len; + warmupFilelistDeque_.emplace_back(key, len); + } // Skip already added +} + +void WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key) { + if (!mounted_.load(std::memory_order_acquire)) { + LOG(ERROR) << "not mounted"; + } + // add warmup Progress + if (!AddWarmupProcess(key)) { + return; + } + FetchDataEnqueue(key, key); +} + +void WarmupManagerS3Impl::UnInit() { + bgFetchStop_.store(true, std::memory_order_release); + if (initbgFetchThread_) { + bgFetchThread_.join(); + } + + { + for (auto &task : inode2FetchDentryPool_) { + task.second->Stop(); + } + WriteLockGuard lock(inode2FetchDentryPoolMutex_); + inode2FetchDentryPool_.clear(); + } + + { + for (auto &task : inode2FetchS3ObjectsPool_) { + task.second->Stop(); + } + WriteLockGuard lock(inode2FetchS3ObjectsPoolMutex_); + inode2FetchS3ObjectsPool_.clear(); + } + + { + WriteLockGuard lock(warmupInodesDequeMutex_); + warmupInodesDeque_.clear(); + } + + { + WriteLockGuard lock(warmupInodesDequeMutex_); + warmupInodesDeque_.clear(); + } + + WarmupManager::UnInit(); +} + +void WarmupManagerS3Impl::Init(const FuseClientOption &option) { + listDentryLimit_ = option.listDentryLimit; + downloadMaxRetryTimes_ = option.downloadMaxRetryTimes; + warmupThreadsNum_ = option.warmupThreadsNum; + bgFetchStop_.store(false, std::memory_order_release); + bgFetchThread_ = Thread(&WarmupManagerS3Impl::BackGroundFetch, this); + initbgFetchThread_ = true; +} + +void WarmupManagerS3Impl::BackGroundFetch() { + while (!bgFetchStop_.load(std::memory_order_acquire)) { + usleep(WARMUP_CHECKINTERVAL_US); + ScanWarmupFilelist(); + ScanWarmupFiles(); + ScanCleanFetchS3ObjectsPool(); + ScanCleanFetchDentryPool(); + ScanCleanWarmupProgress(); + } +} + +void WarmupManagerS3Impl::GetWarmupList(const WarmupFilelist &filelist, + std::vector *list) { + struct fuse_file_info fi {}; + fi.flags &= ~O_DIRECT; + size_t rSize = 0; + std::unique_ptr data(new char[filelist.GetFileLen() + 1]); + std::memset(data.get(), 0, filelist.GetFileLen()); + data[filelist.GetFileLen()] = '\n'; + fuseOpRead_(nullptr, filelist.GetKey(), filelist.GetFileLen(), 0, &fi, + data.get(), &rSize); + std::string file = data.get(); + VLOG(9) << "file is: " << file; + // remove enter, newline, blank + std::string blanks("\r\n "); + file.erase(0, file.find_first_not_of(blanks)); + file.erase(file.find_last_not_of(blanks) + 1); + VLOG(9) << "after del file is: " << file; + curve::common::AddSplitStringToResult(file, "\n", list); +} + +void WarmupManagerS3Impl::FetchDentryEnqueue(fuse_ino_t key, + const std::string &file) { + VLOG(9) << "FetchDentryEnqueue start: " << key << " file: " << file; + auto task = [this, key, file]() { LookPath(key, file); }; + AddFetchDentryTask(key, task); + VLOG(9) << "FetchDentryEnqueue end: " << key << " file: " << file; +} + +void WarmupManagerS3Impl::LookPath(fuse_ino_t key, std::string file) { + VLOG(9) << "LookPath start key: " << key << " file: " << file; + std::vector splitPath; + // remove enter, newline, blank + std::string blanks("\r\n "); + file.erase(0, file.find_first_not_of(blanks)); + file.erase(file.find_last_not_of(blanks) + 1); + if (file.empty()) { + VLOG(9) << "empty path"; + return; + } + bool isRoot = false; + if (file == "/") { + splitPath.push_back(file); + isRoot = true; + } else { + curve::common::AddSplitStringToResult(file, "/", &splitPath); + } + VLOG(6) << "splitPath size is: " << splitPath.size(); + if (splitPath.size() == 1 && isRoot) { + VLOG(9) << "i am root"; + auto task = [this, key]() { + FetchChildDentry(key, fsInfo_->rootinodeid()); + }; + AddFetchDentryTask(key, task); + return; + } else if (splitPath.size() == 1) { + VLOG(9) << "parent is root: " << fsInfo_->rootinodeid() + << ", path is: " << splitPath[0]; + auto task = [this, key, splitPath]() { + FetchDentry(key, fsInfo_->rootinodeid(), splitPath[0]); + }; + AddFetchDentryTask(key, task); + return; + } else if (splitPath.size() > 1) { // travel path + VLOG(9) << "traverse path start: " << splitPath.size(); + std::string lastName = splitPath.back(); + splitPath.pop_back(); + fuse_ino_t ino = fsInfo_->rootinodeid(); + for (auto iter : splitPath) { + VLOG(9) << "traverse path: " << iter << "ino is: " << ino; + Dentry dentry; + std::string pathName = iter; + CURVEFS_ERROR ret = + dentryManager_->GetDentry(ino, pathName, &dentry); + if (ret != CURVEFS_ERROR::OK) { + if (ret != CURVEFS_ERROR::NOTEXIST) { + LOG(WARNING) + << "dentryManager_ get dentry fail, ret = " << ret + << ", parent inodeid = " << ino << ", name = " << file; + } + VLOG(9) << "FetchDentry error: " << ret; + return; + } + ino = dentry.inodeid(); + } + auto task = [this, key, ino, lastName]() { + FetchDentry(key, ino, lastName); + }; + AddFetchDentryTask(key, task); + VLOG(9) << "ino is: " << ino << " lastname is: " << lastName; + return; + } else { + VLOG(3) << "unknown path"; + } + VLOG(9) << "LookPath start end: " << key << " file: " << file; +} + +void WarmupManagerS3Impl::FetchDentry(fuse_ino_t key, fuse_ino_t ino, + const std::string &file) { + VLOG(9) << "FetchDentry start: " << file << ", ino: " << ino + << " key: " << key; + Dentry dentry; + CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry); + if (ret != CURVEFS_ERROR::OK) { + if (ret != CURVEFS_ERROR::NOTEXIST) { + LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret + << ", parent inodeid = " << ino << ", name = " << file; + } else { + LOG(ERROR) << "FetchDentry key: " << key << " file: " << file + << " errorCode: " << ret; + } + return; + } + if (FsFileType::TYPE_S3 == dentry.type()) { + WriteLockGuard lock(warmupInodesDequeMutex_); + auto iterDeque = LockedFindKeyWarmupInodes(key); + if (iterDeque == warmupInodesDeque_.end()) { + warmupInodesDeque_.emplace_back( + key, std::set{dentry.inodeid()}); + } else { + iterDeque->AddFileInode(dentry.inodeid()); + } + return; + } else if (FsFileType::TYPE_DIRECTORY == dentry.type()) { + auto task = [this, key, dentry]() { + FetchChildDentry(key, dentry.inodeid()); + }; + AddFetchDentryTask(key, task); + VLOG(9) << "FetchDentry: " << dentry.inodeid(); + return; + + } else if (FsFileType::TYPE_SYM_LINK == dentry.type()) { + // skip links + } else { + VLOG(3) << "unkown, file: " << file << ", ino: " << ino; + return; + } + VLOG(9) << "FetchDentry end: " << file << ", ino: " << ino; +} + +void WarmupManagerS3Impl::FetchChildDentry(fuse_ino_t key, fuse_ino_t ino) { + VLOG(9) << "FetchChildDentry start: key:" << key << " inode: " << ino; + std::list dentryList; + auto limit = listDentryLimit_; + CURVEFS_ERROR ret = dentryManager_->ListDentry(ino, &dentryList, limit); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret + << ", parent = " << ino; + return; + } + for (const auto &dentry : dentryList) { + VLOG(9) << "FetchChildDentry: key:" << key + << " dentry: " << dentry.name(); + if (FsFileType::TYPE_S3 == dentry.type()) { + WriteLockGuard lock(warmupInodesDequeMutex_); + auto iterDeque = LockedFindKeyWarmupInodes(key); + if (iterDeque == warmupInodesDeque_.end()) { + warmupInodesDeque_.emplace_back( + key, std::set{dentry.inodeid()}); + } else { + iterDeque->AddFileInode(dentry.inodeid()); + } + VLOG(9) << "FetchChildDentry: " << dentry.inodeid(); + } else if (FsFileType::TYPE_DIRECTORY == dentry.type()) { + auto task = [this, key, dentry]() { + FetchChildDentry(key, dentry.inodeid()); + }; + AddFetchDentryTask(key, task); + VLOG(9) << "FetchChildDentry: " << dentry.inodeid(); + } else if (FsFileType::TYPE_SYM_LINK == dentry.type()) { // need todo + } else { + VLOG(9) << "unknown type"; + } + } + VLOG(9) << "FetchChildDentry end: key:" << key << " inode: " << ino; +} + +void WarmupManagerS3Impl::FetchDataEnqueue(fuse_ino_t key, fuse_ino_t ino) { + VLOG(9) << "FetchDataEnqueue start: key:" << key << " inode: " << ino; + std::shared_ptr inodeWrapper; + CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "inodeManager get inode fail, ret = " << ret + << ", inodeid = " << ino; + return; + } + S3ChunkInfoMapType s3ChunkInfoMap; + { + ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + s3ChunkInfoMap = *inodeWrapper->GetChunkInfoMap(); + } + if (s3ChunkInfoMap.empty()) { + return; + } + TravelChunks(key, ino, s3ChunkInfoMap); + VLOG(9) << "FetchDataEnqueue end: key:" << key << " inode: " << ino; +} + +void WarmupManagerS3Impl::TravelChunks( + fuse_ino_t key, fuse_ino_t ino, const S3ChunkInfoMapType &s3ChunkInfoMap) { + VLOG(9) << "travel chunk start: " << ino + << ", size: " << s3ChunkInfoMap.size(); + for (auto const &infoIter : s3ChunkInfoMap) { + VLOG(9) << "travel chunk: " << infoIter.first; + std::list> prefetchObjs; + TravelChunk(ino, infoIter.second, &prefetchObjs); + { + WriteLockGuard lock(inode2ProgressMutex_); + auto iter = FindKeyWarmupProgress(key); + if (iter != inode2Progress_.end()) { + iter->second.AddTotal(prefetchObjs.size()); + } else { + LOG(ERROR) << "no such warmup progress: " << key; + } + } + auto task = [this, key, prefetchObjs]() { + WarmUpAllObjs(key, prefetchObjs); + }; + AddFetchS3objectsTask(key, task); + } + VLOG(9) << "travel chunks end"; +} + +void WarmupManagerS3Impl::TravelChunk(fuse_ino_t ino, + const S3ChunkInfoList &chunkInfo, + ObjectListType *prefetchObjs) { + uint64_t blockSize = s3Adaptor_->GetBlockSize(); + uint64_t chunkSize = s3Adaptor_->GetChunkSize(); + uint64_t offset, len, chunkid, compaction; + for (size_t i = 0; i < chunkInfo.s3chunks_size(); i++) { + auto const &chunkinfo = chunkInfo.s3chunks(i); + auto fsId = fsInfo_->fsid(); + chunkid = chunkinfo.chunkid(); + compaction = chunkinfo.compaction(); + offset = chunkinfo.offset(); + len = chunkinfo.len(); + // the offset in the chunk + uint64_t chunkPos = offset % chunkSize; + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + // the first blockIndex + uint64_t blockIndexBegin = chunkPos / blockSize; + + if (len < blockSize) { // just one block + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs->push_back(std::make_pair(objectName, len)); + } else { + // the offset in the block + uint64_t blockPos = chunkPos % blockSize; + + // firstly, let's get the size in the first block + // then, subtract the length in the first block + // to obtain the remaining length + // lastly, We need to judge the last block is full or not + uint64_t firstBlockSize = + (blockPos != 0) ? blockSize - blockPos : blockSize; + uint64_t leftSize = len - firstBlockSize; + uint32_t blockCounts = (leftSize % blockSize == 0) + ? (leftSize / blockSize + 1) + : (leftSize / blockSize + 1 + 1); + // so we can get the last blockIndex + // because the bolck Index is cumulative + uint64_t blockIndexEnd = blockIndexBegin + blockCounts - 1; + + // the size of the last block + uint64_t lastBlockSize = leftSize % blockSize; + // whether the first block or the last block is full or not + bool firstBlockFull = (blockPos == 0); + bool lastBlockFull = (lastBlockSize == 0); + // the start and end block Index that need travel + uint64_t travelStartIndex, travelEndIndex; + // if the block is full, the size is needed download + // of the obj is blockSize. Otherwise, the value is special. + if (!firstBlockFull) { + travelStartIndex = blockIndexBegin + 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexBegin, compaction, fsId, ino); + prefetchObjs->push_back( + std::make_pair(objectName, firstBlockSize)); + } else { + travelStartIndex = blockIndexBegin; + } + if (!lastBlockFull) { + // block index is greater than or equal to 0 + travelEndIndex = (blockIndexEnd == blockIndexBegin) + ? blockIndexEnd + : blockIndexEnd - 1; + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndexEnd, compaction, fsId, ino); + // there is no need to care about the order + // in which objects are downloaded + prefetchObjs->push_back( + std::make_pair(objectName, lastBlockSize)); + } else { + travelEndIndex = blockIndexEnd; + } + VLOG(9) << "travel obj, ino: " << ino << ", chunkid: " << chunkid + << ", blockCounts: " << blockCounts + << ", compaction: " << compaction + << ", blockSize: " << blockSize + << ", chunkSize: " << chunkSize << ", offset: " << offset + << ", blockIndexBegin: " << blockIndexBegin + << ", blockIndexEnd: " << blockIndexEnd << ", len: " << len + << ", firstBlockSize: " << firstBlockSize + << ", lastBlockSize: " << lastBlockSize + << ", blockPos: " << blockPos << ", chunkPos: " << chunkPos; + for (auto blockIndex = travelStartIndex; + blockIndex <= travelEndIndex; blockIndex++) { + auto objectName = curvefs::common::s3util::GenObjName( + chunkid, blockIndex, compaction, fsId, ino); + prefetchObjs->push_back(std::make_pair(objectName, blockSize)); + } + } + } +} + +// TODO(hzwuhongsong): These logics are very similar to other place, +// try to merge it +void WarmupManagerS3Impl::WarmUpAllObjs( + fuse_ino_t key, + const std::list> &prefetchObjs) { + std::atomic pendingReq(0); + curve::common::CountDownEvent cond(1); + // callback function + GetObjectAsyncCallBack cb = + [&](const S3Adapter *adapter, + const std::shared_ptr &context) { + if (bgFetchStop_.load()) { + VLOG(9) << "need stop warmup"; + cond.Signal(); + return; + } + { + // update progress + WriteLockGuard lock(inode2ProgressMutex_); + auto iter = FindKeyWarmupProgress(key); + if (iter != inode2Progress_.end()) { + iter->second.FinishedPlusOne(); + } else { + VLOG(9) << "no such warmup progress: " << key; + } + } + if (context->retCode == 0) { + VLOG(9) << "Get Object success: " << context->key; + int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect( + context->key, context->buf, context->len); + if (ret < 0) { + LOG_EVERY_SECOND(INFO) + << "write read directly failed, key: " << context->key; + } + if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { + VLOG(6) << "pendingReq is over"; + cond.Signal(); + } + delete[] context->buf; + return; + } + if (++context->retry >= downloadMaxRetryTimes_) { + if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { + VLOG(6) << "pendingReq is over"; + cond.Signal(); + } + VLOG(9) << "Up to max retry times, " + << "download object failed, key: " << context->key; + delete[] context->buf; + return; + } + + LOG(WARNING) << "Get Object failed, key: " << context->key + << ", offset: " << context->offset; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + }; + + pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst); + if (pendingReq.load(std::memory_order_seq_cst)) { + VLOG(9) << "wait for pendingReq"; + for (auto iter : prefetchObjs) { + VLOG(9) << "download start: " << iter.first; + std::string name = iter.first; + uint64_t readLen = iter.second; + if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) { + pendingReq.fetch_sub(1); + continue; + } + char *cacheS3 = new char[readLen]; + memset(cacheS3, 0, readLen); + auto context = std::make_shared(); + context->key = name; + context->buf = cacheS3; + context->offset = 0; + context->len = readLen; + context->cb = cb; + context->retry = 0; + s3Adaptor_->GetS3Client()->DownloadAsync(context); + } + if (pendingReq.load()) + cond.Wait(); + } +} + +bool WarmupManagerS3Impl::ProgressDone(fuse_ino_t key) { + bool ret; + { + ReadLockGuard lockList(warmupFilelistDequeMutex_); + ret = LockedFindKeyWarmupFilelist(key) == warmupFilelistDeque_.end(); + } + + inode2FetchDentryPoolMutex_.RDLock(); + ret = ret && + (LockedFindKeyFetchDentryPool(key) == inode2FetchDentryPool_.end()); + inode2FetchDentryPoolMutex_.Unlock(); + + { + ReadLockGuard lockInodes(warmupInodesDequeMutex_); + ret = + ret && (LockedFindKeyWarmupInodes(key) == warmupInodesDeque_.end()); + } + + + inode2FetchS3ObjectsPoolMutex_.RDLock(); + ret = ret && (LockedFindKeyFetchS3ObjectsPool(key) == + inode2FetchS3ObjectsPool_.end()); + inode2FetchS3ObjectsPoolMutex_.Unlock(); + return ret; +} + +void WarmupManagerS3Impl::ScanCleanFetchDentryPool() { + // clean inode2FetchDentryPool_ + inode2FetchDentryPoolMutex_.RDLock(); + for (auto iter = inode2FetchDentryPool_.begin(); + iter != inode2FetchDentryPool_.end();) { + std::deque::iterator iterInode; + if (iter->second->QueueSize() == 0) { + VLOG(9) << "remove FetchDentry task: " << iter->first; + inode2FetchDentryPoolMutex_.Unlock(); + iter->second->Stop(); + inode2FetchDentryPoolMutex_.WRLock(); + iter = inode2FetchDentryPool_.erase(iter); + inode2FetchDentryPoolMutex_.Unlock(); + inode2FetchDentryPoolMutex_.RDLock(); + } else { + ++iter; + } + } + inode2FetchDentryPoolMutex_.Unlock(); +} + +void WarmupManagerS3Impl::ScanCleanFetchS3ObjectsPool() { + // clean inode2FetchS3ObjectsPool_ + inode2FetchS3ObjectsPoolMutex_.RDLock(); + for (auto iter = inode2FetchS3ObjectsPool_.begin(); + iter != inode2FetchS3ObjectsPool_.end();) { + if (iter->second->QueueSize() == 0) { + VLOG(9) << "remove FetchS3object task: " << iter->first; + inode2FetchS3ObjectsPoolMutex_.Unlock(); + iter->second->Stop(); + inode2FetchS3ObjectsPoolMutex_.WRLock(); + iter = inode2FetchS3ObjectsPool_.erase(iter); + inode2FetchS3ObjectsPoolMutex_.Unlock(); + inode2FetchS3ObjectsPoolMutex_.RDLock(); + } else { + ++iter; + } + } + inode2FetchS3ObjectsPoolMutex_.Unlock(); +} + +void WarmupManagerS3Impl::ScanCleanWarmupProgress() { + // clean done warmupProgress + WriteLockGuard lockWarmupProgress(inode2ProgressMutex_); + auto iter = inode2Progress_.begin(); + for (; iter != inode2Progress_.end();) { + if (ProgressDone(iter->first)) { + VLOG(9) << "warmup key: " << iter->first << " done!"; + iter = inode2Progress_.erase(iter); + } else { + ++iter; + } + } +} + +void WarmupManagerS3Impl::ScanWarmupFiles() { + // file need warmup + WriteLockGuard lock(warmupInodesDequeMutex_); + if (!warmupInodesDeque_.empty()) { + WarmupInodes inodes = warmupInodesDeque_.front(); + for (auto const &iter : inodes.GetReadAheadFiles()) { + VLOG(9) << "BackGroundFetch: key: " << inodes.GetKey() + << " inode:" << iter; + FetchDataEnqueue(inodes.GetKey(), iter); + warmupInodesDeque_.pop_front(); + } + } +} + +void WarmupManagerS3Impl::ScanWarmupFilelist() { + // Use a write lock to ensure that all parsing tasks are added. + WriteLockGuard lock(warmupFilelistDequeMutex_); + if (!warmupFilelistDeque_.empty()) { + WarmupFilelist warmupFilelist = warmupFilelistDeque_.front(); + VLOG(9) << "warmup ino: " << warmupFilelist.GetKey() + << " len is: " << warmupFilelist.GetFileLen(); + + std::vector warmuplist; + GetWarmupList(warmupFilelist, &warmuplist); + for (auto filePath : warmuplist) { + FetchDentryEnqueue(warmupFilelist.GetKey(), filePath); + } + warmupFilelistDeque_.pop_front(); + } +} + +void WarmupManagerS3Impl::AddFetchDentryTask(fuse_ino_t key, + std::function task) { + VLOG(9) << "add fetchDentry task: " << key; + std::unique_ptr tp = absl::make_unique(); + tp->Start(warmupThreadsNum_); + inode2FetchDentryPoolMutex_.WRLock(); + auto iter = inode2FetchDentryPool_.emplace(key, std::move(tp)); + inode2FetchDentryPoolMutex_.Unlock(); + if (!iter.first->second->Enqueue(task)) { + LOG(ERROR) << "key:" << key + << " fetch dentry thread pool has been stoped!"; + } + + VLOG(9) << "add fetchDentry task: " << key << " finished"; +} + +void WarmupManagerS3Impl::AddFetchS3objectsTask(fuse_ino_t key, + std::function task) { + VLOG(9) << "add fetchDentry task: " << key; + std::unique_ptr tp = absl::make_unique(); + tp->Start(warmupThreadsNum_); + inode2FetchS3ObjectsPoolMutex_.WRLock(); + auto iter = inode2FetchS3ObjectsPool_.emplace(key, std::move(tp)); + inode2FetchS3ObjectsPoolMutex_.Unlock(); + if (!iter.first->second->Enqueue(task)) { + LOG(ERROR) << "key:" << key + << " fetch s3 objects thread pool has been stoped!"; + } + + VLOG(9) << "add fetchS3Objects task: " << key << " finished"; +} + +} // namespace warmup +} // namespace client +} // namespace curvefs diff --git a/curvefs/src/client/warmup/warmup_manager.h b/curvefs/src/client/warmup/warmup_manager.h new file mode 100644 index 0000000000..e7db48d34d --- /dev/null +++ b/curvefs/src/client/warmup/warmup_manager.h @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: 2023-01-31 + * Author: chengyi01 + */ + +#ifndef CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_ +#define CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "curvefs/src/client/dentry_cache_manager.h" +#include "curvefs/src/client/fuse_common.h" +#include "curvefs/src/client/inode_cache_manager.h" +#include "curvefs/src/client/rpcclient/metaserver_client.h" +#include "curvefs/src/client/s3/client_s3_adaptor.h" +#include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "src/common/concurrent/concurrent.h" +#include "src/common/concurrent/rw_lock.h" +#include "curvefs/src/common/task_thread_pool.h" + +namespace curvefs { +namespace client { +namespace warmup { + +using common::FuseClientOption; + +using ThreadPool = curvefs::common::TaskThreadPool2; +using curve::common::BthreadRWLock; + +class WarmupFile { + public: + explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0) + : key_(key), fileLen_(fileLen) {} + + fuse_ino_t GetKey() const { return key_; } + uint64_t GetFileLen() const { return fileLen_; } + bool operator==(const WarmupFile &other) const { + return key_ == other.key_; + } + + private: + fuse_ino_t key_; + uint64_t fileLen_; +}; + +using WarmupFilelist = WarmupFile; + +class WarmupInodes { + public: + explicit WarmupInodes(fuse_ino_t key = 0, + std::set list = std::set()) + : key_(key), readAheadFiles_(std::move(list)) {} + + fuse_ino_t GetKey() const { return key_; } + const std::set &GetReadAheadFiles() const { + return readAheadFiles_; + } + + void AddFileInode(fuse_ino_t file) { readAheadFiles_.emplace(file); } + + private: + fuse_ino_t key_; + std::set readAheadFiles_; +}; + + +using FuseOpReadFunctionType = + std::function; + +class WarmupProgress { + public: + explicit WarmupProgress(uint64_t total = 0, uint64_t finished = 0) + : total_(total), finished_(finished) {} + + void AddTotal(uint64_t add) { total_ += add; } + + void FinishedPlusOne() { ++finished_; } + + uint64_t GetTotal() const { return total_; } + + uint64_t GetFinished() const { return finished_; } + + std::string ToString() const { + return "total:" + std::to_string(total_) + + ",finished:" + std::to_string(finished_); + } + + private: + uint64_t total_; + uint64_t finished_; +}; + +class WarmupManager { + public: + WarmupManager() + : mounted_(false), + metaClient_(std::make_shared()), + inodeManager_(std::make_shared(metaClient_)), + dentryManager_( + std::make_shared(metaClient_)) {} + + explicit WarmupManager(std::shared_ptr metaClient, + std::shared_ptr inodeManager, + std::shared_ptr dentryManager, + std::shared_ptr fsInfo, + FuseOpReadFunctionType readFunc) + : mounted_(false), metaClient_(std::move(metaClient)), + inodeManager_(std::move(inodeManager)), + dentryManager_(std::move(dentryManager)), fsInfo_(std::move(fsInfo)), + fuseOpRead_(std::move(readFunc)) {} + + virtual void Init(const FuseClientOption &option) = 0; + virtual void UnInit() { ClearWarmupProcess(); } + + virtual void AddWarmupFilelist(fuse_ino_t key) = 0; + virtual void AddWarmupFile(fuse_ino_t key) = 0; + + void SetMounted(bool mounted) { + mounted_.store(mounted, std::memory_order_release); + } + + void SetFsInfo(const std::shared_ptr &fsinfo) { + fsInfo_ = fsinfo; + } + + void SetFuseOpRead(const FuseOpReadFunctionType &read) { + fuseOpRead_ = read; + } + + /** + * @brief + * + * @param key + * @param progress + * @return true + * @return false no this warmup task or finished + */ + bool QueryWarmupProgress(fuse_ino_t key, WarmupProgress *progress) { + bool ret = true; + auto iter = FindKeyWarmupProgress(key); + if (iter != inode2Progress_.end()) { + *progress = iter->second; + } else { + ret = false; + } + return ret; + } + + protected: + /** + * @brief Add warmupProcess + * + * @return true + * @return false warmupProcess has been added + */ + virtual bool AddWarmupProcess(fuse_ino_t key) { + WriteLockGuard lock(inode2ProgressMutex_); + auto ret = inode2Progress_.emplace(key, WarmupProgress()); + return ret.second; + } + + /** + * @brief + * Please use it with the lock inode2ProgressMutex_ + * @param key + * @return std::unordered_map::iterator + */ + std::unordered_map::iterator + FindKeyWarmupProgress(fuse_ino_t key) { + return inode2Progress_.find(key); + } + + virtual void ClearWarmupProcess() { + WriteLockGuard lock(inode2ProgressMutex_); + inode2Progress_.clear(); + } + + protected: + std::atomic mounted_; + + // metaserver client + std::shared_ptr metaClient_; + + // inode cache manager + std::shared_ptr inodeManager_; + + // dentry cache manager + std::shared_ptr dentryManager_; + + // filesystem info + std::shared_ptr fsInfo_; + + // FuseOpRead + FuseOpReadFunctionType fuseOpRead_; + + // warmup progress + std::unordered_map inode2Progress_; + mutable RWLock inode2ProgressMutex_; +}; + +class WarmupManagerS3Impl : public WarmupManager { + public: + explicit WarmupManagerS3Impl( + std::shared_ptr metaClient, + std::shared_ptr inodeManager, + std::shared_ptr dentryManager, + std::shared_ptr fsInfo, FuseOpReadFunctionType readFunc, + std::shared_ptr s3Adaptor) + : WarmupManager(std::move(metaClient), std::move(inodeManager), + std::move(dentryManager), std::move(fsInfo), + std::move(readFunc)), + s3Adaptor_(std::move(s3Adaptor)) {} + + void AddWarmupFilelist(fuse_ino_t key) override; + void AddWarmupFile(fuse_ino_t key) override; + + void Init(const FuseClientOption &option) override; + void UnInit() override; + + private: + void BackGroundFetch(); + + void GetWarmupList(const WarmupFilelist &filelist, + std::vector *list); + + void FetchDentryEnqueue(fuse_ino_t key, const std::string &file); + + void LookPath(fuse_ino_t key, std::string file); + + void FetchDentry(fuse_ino_t key, fuse_ino_t ino, const std::string &file); + + void FetchChildDentry(fuse_ino_t key, fuse_ino_t ino); + + /** + * @brief + * Please use it with the lock warmupInodesDequeMutex_ + * @param key + * @return std::deque::iterator + */ + std::deque::iterator + LockedFindKeyWarmupInodes(fuse_ino_t key) { + return std::find_if(warmupInodesDeque_.begin(), + warmupInodesDeque_.end(), + [key](const WarmupInodes &inodes) { + return key == inodes.GetKey(); + }); + } + + /** + * @brief + * Please use it with the lock warmupFilelistDequeMutex_ + * @param key + * @return std::deque::iterator + */ + std::deque::iterator + LockedFindKeyWarmupFilelist(fuse_ino_t key) { + return std::find_if(warmupFilelistDeque_.begin(), + warmupFilelistDeque_.end(), + [key](const WarmupFilelist &filelist_) { + return key == filelist_.GetKey(); + }); + } + + /** + * @brief + * Please use it with the lock inode2FetchDentryPoolMutex_ + * @param key + * @return std::map>::iterator + */ + std::map>::iterator + LockedFindKeyFetchDentryPool(fuse_ino_t key) { + return inode2FetchDentryPool_.find(key); + } + + /** + * @brief + * Please use it with the lock inode2FetchS3ObjectsPoolMutex_ + * @param key + * @return std::map>::iterator + */ + std::map>::iterator + LockedFindKeyFetchS3ObjectsPool(fuse_ino_t key) { + return inode2FetchS3ObjectsPool_.find(key); + } + + void FetchDataEnqueue(fuse_ino_t key, fuse_ino_t ino); + + using S3ChunkInfoMapType = google::protobuf::Map; + + // travel all chunks + void TravelChunks(fuse_ino_t key, fuse_ino_t ino, + const S3ChunkInfoMapType &s3ChunkInfoMap); + + using ObjectListType = std::list>; + // travel and download all objs belong to the chunk + void TravelChunk(fuse_ino_t ino, const S3ChunkInfoList &chunkInfo, + ObjectListType *prefetchObjs); + + // warmup all the prefetchObjs + void WarmUpAllObjs( + fuse_ino_t key, + const std::list> &prefetchObjs); + + /** + * @brief Whether the warmup task[key] is completed (or terminated) + * + * @return true + * @return false + */ + bool ProgressDone(fuse_ino_t key); + + void ScanCleanFetchDentryPool(); + + void ScanCleanFetchS3ObjectsPool(); + + void ScanCleanWarmupProgress(); + + void ScanWarmupFiles(); + + void ScanWarmupFilelist(); + + void AddFetchDentryTask(fuse_ino_t key, std::function task); + + void AddFetchS3objectsTask(fuse_ino_t key, std::function task); + + protected: + std::deque warmupFilelistDeque_; + mutable RWLock warmupFilelistDequeMutex_; + + bool initbgFetchThread_ = false; + Thread bgFetchThread_; + std::atomic bgFetchStop_; + + std::map> + inode2FetchDentryPool_; + BthreadRWLock inode2FetchDentryPoolMutex_; + + std::deque warmupInodesDeque_; + mutable RWLock warmupInodesDequeMutex_; + + // s3 adaptor + std::shared_ptr s3Adaptor_; + + std::map> + inode2FetchS3ObjectsPool_; + BthreadRWLock inode2FetchS3ObjectsPoolMutex_; + + uint32_t listDentryLimit_; + uint32_t downloadMaxRetryTimes_; + uint32_t warmupThreadsNum_; +}; + +} // namespace warmup +} // namespace client +} // namespace curvefs + +#endif // CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_ diff --git a/curvefs/src/common/task_thread_pool.h b/curvefs/src/common/task_thread_pool.h new file mode 100644 index 0000000000..981deabbac --- /dev/null +++ b/curvefs/src/common/task_thread_pool.h @@ -0,0 +1,61 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +/* + * Project: curve + * Created Date: 2023-02-14 + * Author: chengyi01 + */ + +#ifndef CURVEFS_SRC_COMMON_TASK_THREAD_POOL_H_ +#define CURVEFS_SRC_COMMON_TASK_THREAD_POOL_H_ + +#include "src/common/concurrent/task_thread_pool.h" + +#include +#include +#include + +namespace curvefs { +namespace common { + +template +class TaskThreadPool2 : public curve::common::TaskThreadPool { + using Base = curve::common::TaskThreadPool; + + public: + template bool Enqueue(F &&f, Args &&...args) { + std::unique_lock guard(Base::mutex_); + + if (!Base::running_.load(std::memory_order_acquire)) { + // When stopped, running_ false recovery + return false; + } + + while (Base::IsFullUnlock()) { + Base::notFull_.wait(guard); + } + auto task = std::bind(std::forward(f), std::forward(args)...); + Base::queue_.push_back(std::move(task)); + Base::notEmpty_.notify_one(); + return true; + } +}; + +} // namespace common +} // namespace curvefs + +#endif // CURVEFS_SRC_COMMON_TASK_THREAD_POOL_H_ diff --git a/curvefs/test/client/mock_client_s3_adaptor.h b/curvefs/test/client/mock_client_s3_adaptor.h index e31d9152d2..10a20668e4 100644 --- a/curvefs/test/client/mock_client_s3_adaptor.h +++ b/curvefs/test/client/mock_client_s3_adaptor.h @@ -23,6 +23,8 @@ #ifndef CURVEFS_TEST_CLIENT_MOCK_CLIENT_S3_ADAPTOR_H_ #define CURVEFS_TEST_CLIENT_MOCK_CLIENT_S3_ADAPTOR_H_ +#include + #include #include diff --git a/curvefs/test/client/mock_disk_cache_manager.h b/curvefs/test/client/mock_disk_cache_manager.h index 8ee516f2b1..c78a7ff5a1 100644 --- a/curvefs/test/client/mock_disk_cache_manager.h +++ b/curvefs/test/client/mock_disk_cache_manager.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -68,15 +69,21 @@ class MockDiskCacheManager2 : public DiskCacheManager { MOCK_METHOD0(IsDiskCacheFull, bool()); MOCK_METHOD3(WriteReadDirect, int(const std::string fileName, const char *buf, uint64_t length)); + MOCK_METHOD1(IsCached, bool(const std::string)); }; class MockDiskCacheManagerImpl : public DiskCacheManagerImpl { public: MockDiskCacheManagerImpl() : DiskCacheManagerImpl() {} + MockDiskCacheManagerImpl(std::shared_ptr diskCacheManager, + std::shared_ptr client) + : DiskCacheManagerImpl(std::move(diskCacheManager), std::move(client)) { + } ~MockDiskCacheManagerImpl() {} MOCK_METHOD1(UploadWriteCacheByInode, int(const std::string &inode)); MOCK_METHOD1(ClearReadCache, int(const std::list &files)); + MOCK_METHOD1(IsCached, bool(const std::string)); }; } // namespace client diff --git a/curvefs/test/client/test_fuse_s3_client.cpp b/curvefs/test/client/test_fuse_s3_client.cpp index 9f87bf3f35..e6e9c6c367 100644 --- a/curvefs/test/client/test_fuse_s3_client.cpp +++ b/curvefs/test/client/test_fuse_s3_client.cpp @@ -20,19 +20,30 @@ * Author: xuchaojie */ + +#include +#include #include #include -#include "curvefs/src/client/common/common.h" +#include +#include + #include "curvefs/proto/metaserver.pb.h" +#include "curvefs/src/client/common/common.h" #include "curvefs/src/client/error_code.h" #include "curvefs/src/client/fuse_s3_client.h" +#include "curvefs/src/client/rpcclient/metaserver_client.h" +#include "curvefs/src/client/s3/disk_cache_manager_impl.h" +#include "curvefs/src/client/warmup/warmup_manager.h" #include "curvefs/src/common/define.h" +#include "curvefs/test/client/mock_client_s3.h" #include "curvefs/test/client/mock_client_s3_adaptor.h" #include "curvefs/test/client/mock_dentry_cache_mamager.h" +#include "curvefs/test/client/mock_disk_cache_manager.h" #include "curvefs/test/client/mock_inode_cache_manager.h" -#include "curvefs/test/client/rpcclient/mock_mds_client.h" #include "curvefs/test/client/mock_metaserver_client.h" +#include "curvefs/test/client/rpcclient/mock_mds_client.h" struct fuse_req { struct fuse_ctx *ctx; @@ -54,18 +65,18 @@ namespace client { using ::curve::common::Configuration; using ::curvefs::mds::topology::PartitionTxId; using ::testing::_; +using ::testing::AtLeast; using ::testing::Contains; using ::testing::Invoke; using ::testing::Return; using ::testing::SetArgPointee; using ::testing::SetArgReferee; -using ::testing::AtLeast; using ::testing::SetArrayArgument; +using curvefs::client::common::FileHandle; +using rpcclient::MetaServerClientDone; using rpcclient::MockMdsClient; using rpcclient::MockMetaServerClient; -using rpcclient::MetaServerClientDone; -using curvefs::client::common::FileHandle; #define EQUAL(a) (lhs.a() == rhs.a()) @@ -86,9 +97,20 @@ class TestFuseS3Client : public ::testing::Test { s3ClientAdaptor_ = std::make_shared(); inodeManager_ = std::make_shared(); dentryManager_ = std::make_shared(); - client_ = std::make_shared(mdsClient_, metaClient_, - inodeManager_, dentryManager_, - s3ClientAdaptor_); + warmupManager_ = std::make_shared( + metaClient_, inodeManager_, dentryManager_, nullptr, nullptr, + s3ClientAdaptor_); + client_ = std::make_shared( + mdsClient_, metaClient_, inodeManager_, dentryManager_, + s3ClientAdaptor_, warmupManager_); + + auto readFunc = [this](fuse_req_t req, fuse_ino_t ino, size_t size, + off_t off, struct fuse_file_info *fi, + char *buffer, size_t *rSize) { + return client_->FuseOpRead(req, ino, size, off, fi, buffer, rSize); + }; + warmupManager_->SetFuseOpRead(readFunc); + InitOptionBasic(&fuseClientOption_); InitFSInfo(client_); fuseClientOption_.s3Opt.s3AdaptrOpt.asyncThreadNum = 1; @@ -119,6 +141,8 @@ class TestFuseS3Client : public ::testing::Test { client_->SetFsInfo(fsInfo); client_->SetMounted(true); + + warmupManager_->SetFsInfo(fsInfo); } void InitOptionBasic(FuseClientOption *opt) { @@ -146,19 +170,20 @@ class TestFuseS3Client : public ::testing::Test { std::shared_ptr client_; FuseClientOption fuseClientOption_; Aws::SDKOptions awsOptions_; + std::shared_ptr warmupManager_; }; TEST_F(TestFuseS3Client, test_Init_with_KVCache) { curvefs::client::common::FLAGS_supportKVcache = true; curvefs::mds::topology::MemcacheClusterInfo memcacheCluster; memcacheCluster.set_clusterid(1); - auto testclient = - std::make_shared(mdsClient_, metaClient_, inodeManager_, - dentryManager_, s3ClientAdaptor_); + auto testclient = std::make_shared( + mdsClient_, metaClient_, inodeManager_, dentryManager_, + s3ClientAdaptor_, nullptr); FuseClientOption opt; InitOptionBasic(&opt); InitFSInfo(testclient); - testclient->SetMounted(true); + // testclient->SetMounted(true); // test init kvcache success { @@ -181,68 +206,13 @@ TEST_F(TestFuseS3Client, test_Init_with_KVCache) { curvefs::client::common::FLAGS_supportKVcache = false; } -// GetDentry failed; dentry not exist -TEST_F(TestFuseS3Client, warmUp_dentryNotexist) { - // wait init - sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; - fuse_ino_t parent = 1; - std::string name = "test"; - fuse_ino_t inodeid = 2; - - Dentry dentry; - dentry.set_fsid(fsId); - dentry.set_name(name); - dentry.set_parentinodeid(parent); - dentry.set_inodeid(inodeid); - dentry.set_type(FsFileType::TYPE_S3); - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), - Return(CURVEFS_ERROR::NOTEXIST))); - client_->PutWarmTask(warmUpPath); - sleep(5); -} - -// GetDentry failed; bad fd -TEST_F(TestFuseS3Client, warmUp_dentryBadFd) { - sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; - fuse_ino_t parent = 1; - std::string name = "test"; - fuse_ino_t inodeid = 2; - - Dentry dentry; - dentry.set_fsid(fsId); - dentry.set_name(name); - dentry.set_parentinodeid(parent); - dentry.set_inodeid(inodeid); - dentry.set_type(FsFileType::TYPE_S3); - - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), - Return(CURVEFS_ERROR::BAD_FD))); - client_->PutWarmTask(warmUpPath); - sleep(5); -} - // GetInode failed; bad fd TEST_F(TestFuseS3Client, warmUp_inodeBadFd) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; - Dentry dentry; - dentry.set_fsid(fsId); - dentry.set_name(name); - dentry.set_parentinodeid(parent); - dentry.set_inodeid(inodeid); - dentry.set_type(FsFileType::TYPE_S3); - Inode inode; inode.set_fsid(fsId); inode.set_inodeid(inodeid); @@ -250,21 +220,26 @@ TEST_F(TestFuseS3Client, warmUp_inodeBadFd) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); EXPECT_CALL(*inodeManager_, GetInode(_, _)) - .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::BAD_FD))); - client_->PutWarmTask(warmUpPath); + .WillOnce(DoAll(SetArgReferee<1>(inodeWrapper), + Return(CURVEFS_ERROR::BAD_FD))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); sleep(5); + client_->GetFsInfo()->set_fstype(old); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // single file (parent is root) TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -283,16 +258,13 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) - .WillOnce(DoAll(SetArgPointee<2>(dentry), - Return(CURVEFS_ERROR::BAD_FD))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::BAD_FD))); + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) + .WillOnce( + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; @@ -303,17 +275,28 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) { tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); sleep(5); + client_->GetFsInfo()->set_fstype(old); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // warmup failed because of GetDentry failed TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -332,16 +315,13 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) - .WillOnce(DoAll(SetArgPointee<2>(dentry), - Return(CURVEFS_ERROR::NOTEXIST))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::NOTEXIST))); + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) + .WillOnce( + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; @@ -352,17 +332,27 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) { tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); + client_->GetFsInfo()->set_fstype(old); } // warmup failed because of Getinode failed TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -381,18 +371,14 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) - .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::NOTEXIST))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) + .WillOnce(DoAll(SetArgReferee<1>(inodeWrapper), + Return(CURVEFS_ERROR::NOTEXIST))); size_t len = 20; char *tmpbuf = new char[len]; @@ -402,17 +388,27 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) { tmpbuf[2] = 'e'; tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // chunk is empty TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -431,18 +427,14 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; @@ -452,17 +444,27 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) { tmpbuf[2] = 'e'; tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // single file (parent is root); FetchDentry TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -488,15 +490,13 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) { auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry1), Return(CURVEFS_ERROR::OK))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); + size_t len = 20; char *tmpbuf = new char[len]; memset(tmpbuf, '\n', len); @@ -505,17 +505,28 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) { tmpbuf[2] = 'e'; tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); + client_->GetFsInfo()->set_fstype(old); } // fetch dentry failed TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -541,19 +552,14 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) { auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry1), Return(CURVEFS_ERROR::OK))); - EXPECT_CALL(*inodeManager_, GetInode(_, _)) + EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); + std::list dlist; - EXPECT_CALL(*dentryManager_, ListDentry(_, _, _, _, _)) - .WillOnce( - DoAll(SetArgPointee<1>(dlist), Return(CURVEFS_ERROR::NOTEXIST))); size_t len = 20; char *tmpbuf = new char[len]; memset(tmpbuf, '\n', len); @@ -562,17 +568,27 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) { tmpbuf[2] = 'e'; tmpbuf[3] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // lookpath TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -591,21 +607,17 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) { inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))) .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; @@ -618,17 +630,27 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) { tmpbuf[5] = 'c'; tmpbuf[6] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // lookpath failed; unknown path TEST_F(TestFuseS3Client, warmUp_lookpath_unkown) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -646,32 +668,38 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_unkown) { inode.set_length(4096); inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; memset(tmpbuf, '\n', len); EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // i am root TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 2; @@ -689,8 +717,6 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) { inode.set_length(4096); inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); std::list dlist; EXPECT_CALL(*dentryManager_, ListDentry(_, _, _, _, _)) @@ -699,28 +725,36 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) { EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; memset(tmpbuf, '\n', len); tmpbuf[0] = '/'; tmpbuf[1] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } // success TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) { sleep(1); - std::string warmUpPath = "/test"; - fuse_req_t req; fuse_ino_t parent = 1; std::string name = "test"; fuse_ino_t inodeid = 5; @@ -738,11 +772,8 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) { inode.set_length(4096); inode.set_type(FsFileType::TYPE_S3); auto inodeWrapper = std::make_shared(inode, metaClient_); - EXPECT_CALL(*dentryManager_, GetDentry(_, _, _)) - .WillOnce(DoAll(SetArgPointee<2>(dentry), Return(CURVEFS_ERROR::OK))); std::list dlist; - std::list dlist1; Dentry dentry1; dentry1.set_fsid(fsId); dentry1.set_inodeid(inodeid); @@ -755,35 +786,33 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) { dentry.set_name("2"); dentry.set_type(FsFileType::TYPE_S3); dlist.emplace_back(dentry); + Dentry dentry2; dentry2.set_inodeid(3); dentry2.set_parentinodeid(parent); dentry2.set_name("3"); dentry2.set_type(FsFileType::TYPE_SYM_LINK); dlist.emplace_back(dentry2); + Dentry dentry3; dentry3.set_inodeid(4); dentry3.set_parentinodeid(parent); dentry3.set_name("4"); - dentry3.set_type(FsFileType::TYPE_FILE); + dentry3.set_type(FsFileType::TYPE_FILE); // unknown type dlist.emplace_back(dentry3); EXPECT_CALL(*dentryManager_, ListDentry(_, _, _, _, _)) - .WillOnce( - DoAll(SetArgPointee<1>(dlist), Return(CURVEFS_ERROR::OK))) + .WillOnce(DoAll(SetArgPointee<1>(dlist), Return(CURVEFS_ERROR::OK))) .WillOnce( DoAll(SetArgPointee<1>(dlist), Return(CURVEFS_ERROR::NOTEXIST))); EXPECT_CALL(*inodeManager_, GetInode(_, _)) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))) + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))) .WillOnce( - DoAll(SetArgReferee<1>(inodeWrapper), - Return(CURVEFS_ERROR::OK))); + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); size_t len = 20; char *tmpbuf = new char[len]; @@ -791,10 +820,22 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) { tmpbuf[0] = '/'; tmpbuf[1] = '\n'; EXPECT_CALL(*s3ClientAdaptor_, Read(_, _, _, _)) - .WillOnce(DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), - Return(len))); - client_->PutWarmTask(warmUpPath); + .WillOnce( + DoAll(SetArrayArgument<3>(tmpbuf, tmpbuf + len), Return(len))); + auto old = client_->GetFsInfo()->fstype(); + client_->GetFsInfo()->set_fstype(FSType::TYPE_S3); + client_->PutWarmFilelistTask(inodeid); + + warmup::WarmupProgress progress; + bool ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + ASSERT_TRUE(ret); + client_->GetFsInfo()->set_fstype(old); sleep(5); + ret = client_->GetWarmupProgress(inodeid, &progress); + LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString(); + // After sleeping for 5s, the scan should be completed + ASSERT_FALSE(ret); } TEST_F(TestFuseS3Client, FuseOpInit_when_fs_exist) { diff --git a/curvefs/test/common/BUILD b/curvefs/test/common/BUILD new file mode 100644 index 0000000000..89f03b7e58 --- /dev/null +++ b/curvefs/test/common/BUILD @@ -0,0 +1,32 @@ +# +# Copyright (c) 2023 NetEase Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +load("//:copts.bzl", "CURVE_TEST_COPTS") + +cc_test( + name = "curvefs-common-test", + srcs = glob([ + "*.cpp", + ]), + deps = [ + "//external:bthread", + "//curvefs/src/common:curvefs_common", + "@com_google_googletest//:gtest_main", + "@com_google_absl//absl/memory", + "@com_google_absl//absl/utility", + ], + copts = CURVE_TEST_COPTS, +) diff --git a/curvefs/test/common/task_thread_pool_test.cpp b/curvefs/test/common/task_thread_pool_test.cpp new file mode 100644 index 0000000000..0882624325 --- /dev/null +++ b/curvefs/test/common/task_thread_pool_test.cpp @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Project: curve + * Created Date: 2023-02-15 + * Author: chengyi01 + */ + +#include "curvefs/src/common/task_thread_pool.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "src/common/concurrent/concurrent.h" +#include "src/common/concurrent/rw_lock.h" +#include "absl/memory/memory.h" + +namespace curvefs { +namespace common { + +using curve::common::BthreadRWLock; +using curve::common::ReadLockGuard; +using curve::common::Thread; +using curve::common::WriteLockGuard; +using ThreadPool = TaskThreadPool2; + +const uint64_t TIME = 1000 * 1000; +const uint64_t THREAD = 10; + +class SumTask { + public: + SumTask(uint64_t start, uint64_t end, uint64_t sum = 0) + : start_(start), end_(end), sum_(sum) {} + uint64_t start_ = 0; + uint64_t end_ = 0; + uint64_t sum_ = 0; +}; + +class SumManager { + public: + void AddSumTask(uint64_t key, std::function task) { + std::unique_ptr tp = absl::make_unique(); + tp->Start(THREAD); + sumThreadPoolMutex_.WRLock(); + auto iter = sumThreadPool_.emplace(key, std::move(tp)); + sumThreadPoolMutex_.Unlock(); + if (!iter.first->second->Enqueue(task)) { + LOG(ERROR) << "add task " << key << " fail!"; + } + } + + void AddSum(uint64_t key, uint64_t start, uint64_t end) { + WriteLockGuard lock(sumMutex_); + auto iter = sum_.emplace(key, SumTask(start, end)); + iter.first->second.sum_ += start; + + auto leftTask = [key, start, end, this]() { + uint64_t left = Left(start); + if (left <= end) { + WriteLockGuard lock(sumMutex_); + sum_.emplace(key, SumTask(start, end)); + auto task = [this, key, left, end]() { + AddSum(key, left, end); + }; + AddSumTask(key, task); + } + }; + AddSumTask(key, leftTask); + + auto rightTask = [key, start, end, this]() { + uint64_t right = Right(start); + if (right <= end) { + WriteLockGuard lock(sumMutex_); + sum_.emplace(key, SumTask(start, end)); + auto task = [this, key, right, end]() { + AddSum(key, right, end); + }; + AddSumTask(key, task); + } + }; + AddSumTask(key, rightTask); + + if (iter.second) { + LOG(INFO) << "add task:" << key << " start:" << start + << " success!"; + } + } + + void Init() { + bgFetchStop_.store(false, std::memory_order_release); + bgFetchThread_ = Thread(&SumManager::backGroundFetch, this); + initbgFetchThread_ = true; + } + void Unit() { + bgFetchStop_.store(true, std::memory_order_release); + if (initbgFetchThread_) { + bgFetchThread_.join(); + } + + { + for (auto &iter : sumThreadPool_) { + LOG(INFO) << "Stop task:" << iter.first; + iter.second->Stop(); + } + WriteLockGuard lock(sumThreadPoolMutex_); + sumThreadPool_.clear(); + } + + { + WriteLockGuard lock(sumMutex_); + for (auto iter : sum_) { + uint64_t result = Sum(iter.second.start_, iter.second.end_); + LOG(INFO) << "Sum Done:" << iter.first + << " sum:" << iter.second.sum_ + << " result:" << result; + ASSERT_EQ(iter.second.sum_, result); + } + } + } + + void WaitAllDone() { + bool working = true; + while (working) { + working = false; + usleep(TIME); + ReadLockGuard lock(sumMutex_); + for (auto const &iter : sum_) { + if (!SumDone(iter.first)) { + working = true; + } + } + } + LOG(INFO) << "all done"; + } + + private: + bool SumDone(uint64_t key) { + ReadLockGuard lock(sumThreadPoolMutex_); + return sumThreadPool_.find(key) == sumThreadPool_.end(); + } + + uint64_t Sum(uint64_t start, uint64_t end) { + return (start + end) * (end - start + 1) / 2; + } + void backGroundFetch() { + while (!bgFetchStop_.load(std::memory_order_acquire)) { + usleep(TIME); + { + sumThreadPoolMutex_.RDLock(); + for (auto iter = sumThreadPool_.begin(); + iter != sumThreadPool_.end();) { + if (iter->second->QueueSize() == 0) { + LOG(INFO) << "Stop task:" << iter->first; + sumThreadPoolMutex_.Unlock(); + iter->second->Stop(); + sumThreadPoolMutex_.WRLock(); + iter = sumThreadPool_.erase(iter); + sumThreadPoolMutex_.Unlock(); + sumThreadPoolMutex_.RDLock(); + } else { + ++iter; + } + } + sumThreadPoolMutex_.Unlock(); + } + { + WriteLockGuard lock(sumMutex_); + for (auto iter = sum_.begin(); iter != sum_.end();) { + if (SumDone(iter->first)) { + uint64_t result = + Sum(iter->second.start_, iter->second.end_); + LOG(INFO) << "rm Sum Done:" << iter->first + << " sum:" << iter->second.sum_ + << " result:" << result; + ASSERT_EQ(iter->second.sum_, result); + iter = sum_.erase(iter); + } else { + ++iter; + } + } + } + } + } + + + static uint64_t Left(uint64_t n) { return 2 * n; } + + static uint64_t Right(uint64_t n) { return 2 * n + 1; } + + protected: + std::map> sumThreadPool_; + BthreadRWLock sumThreadPoolMutex_; + + std::map sum_; + BthreadRWLock sumMutex_; + + Thread bgFetchThread_; + std::atomic bgFetchStop_; + bool initbgFetchThread_; +}; + +class TaskThreadPool2Test : public testing::Test { + protected: + void SetUp() override { + sumManager_ = std::make_shared(); + sumManager_->Init(); + } + void TearDown() override { sumManager_->Unit(); } + + std::shared_ptr sumManager_; +}; + +TEST_F(TaskThreadPool2Test, test) { + sumManager_->AddSum(1, 1, 10); + sumManager_->AddSum(2, 1, 100); + sumManager_->AddSum(3, 1, 1000); + sumManager_->AddSum(4, 1, 10000); + sumManager_->AddSum(5, 1, 100000); + sumManager_->WaitAllDone(); +} + +TEST_F(TaskThreadPool2Test, test2) { + sumManager_->AddSum(4, 1, 10000); + sumManager_->WaitAllDone(); +} + +} // namespace common +} // namespace curvefs diff --git a/src/common/concurrent/task_thread_pool.h b/src/common/concurrent/task_thread_pool.h index 176cf5ef99..b9b23eebe3 100644 --- a/src/common/concurrent/task_thread_pool.h +++ b/src/common/concurrent/task_thread_pool.h @@ -40,13 +40,14 @@ namespace curve { namespace common { + +using Task = std::function; + // 异步运行回调的线程池 template class TaskThreadPool : public Uncopyable { public: - using Task = std::function; - TaskThreadPool() : mutex_(), notEmpty_(), notFull_(), capacity_(-1), running_(false) {} diff --git a/src/common/string_util.h b/src/common/string_util.h index 9fce326cd0..71a43919b1 100644 --- a/src/common/string_util.h +++ b/src/common/string_util.h @@ -40,10 +40,9 @@ namespace curve { namespace common { -inline void SplitString(const std::string& full, - const std::string& delim, - std::vector* result) { - result->clear(); +inline void AddSplitStringToResult(const std::string &full, + const std::string &delim, + std::vector *result) { if (full.empty()) { return; } @@ -69,6 +68,13 @@ inline void SplitString(const std::string& full, } } +inline void SplitString(const std::string& full, + const std::string& delim, + std::vector* result) { + result->clear(); + AddSplitStringToResult(full, delim, result); +} + inline bool StringToUl(const std::string &value, uint32_t *out) noexcept { try { *out = std::stoul(value); diff --git a/tools-v2/go.mod b/tools-v2/go.mod index 96bdcb9f40..e24a1908a0 100644 --- a/tools-v2/go.mod +++ b/tools-v2/go.mod @@ -17,11 +17,16 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.13.0 golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 - golang.org/x/sys v0.0.0-20220913175220-63ea55921009 + golang.org/x/sys v0.5.0 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 ) +require ( + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/schollz/progressbar/v3 v3.13.0 // indirect +) + require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect @@ -50,7 +55,7 @@ require ( github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/magiconair/properties v1.8.6 // indirect - github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/miekg/pkcs11 v1.1.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -65,11 +70,12 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/xattr v0.4.9 github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/rivo/uniseg v0.4.2 // indirect + github.com/rivo/uniseg v0.4.3 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/smartystreets/assertions v1.13.0 // indirect @@ -84,7 +90,7 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect - golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect + golang.org/x/term v0.5.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.12 // indirect google.golang.org/genproto v0.0.0-20220913154956-18f8339a66a5 // indirect diff --git a/tools-v2/go.sum b/tools-v2/go.sum index 8d713648b0..9b6e09ceb7 100644 --- a/tools-v2/go.sum +++ b/tools-v2/go.sum @@ -532,6 +532,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -565,11 +566,14 @@ github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7 github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-sqlite3 v1.6.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -583,6 +587,8 @@ github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WT github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -683,6 +689,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= +github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= +github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= @@ -736,6 +744,8 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw= +github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -744,6 +754,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/schollz/progressbar/v3 v3.13.0 h1:9TeeWRcjW2qd05I8Kf9knPkW4vLM/hYoa6z9ABvxje8= +github.com/schollz/progressbar/v3 v3.13.0/go.mod h1:ZBYnSuLAX2LU8P8UiKN/KgF2DY58AJC8yfVYLPC8Ly4= github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= @@ -1097,19 +1109,27 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220907062415-87db552b00fd h1:AZeIEzg+8RCELJYq8w+ODLVxFgLMMigSwO/ffKPEd9U= golang.org/x/sys v0.0.0-20220907062415-87db552b00fd/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220913175220-63ea55921009 h1:PuvuRMeLWqsf/ZdT1UUZz0syhioyv1mzuFZsXs4fvhw= golang.org/x/sys v0.0.0-20220913175220-63ea55921009/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 h1:Q5284mrmYTpACcm+eAKjKJH48BBwSyfJqmmGDTtT8Vc= golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go b/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go index f06685e1e7..5c645f2dfd 100644 --- a/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go +++ b/tools-v2/pkg/cli/command/curvefs/warmup/add/add.go @@ -62,7 +62,7 @@ type AddCommand struct { var _ basecmd.FinalCurveCmdFunc = (*AddCommand)(nil) // check interface -func NewAddCommandCommand() *cobra.Command { +func NewAddCommand() *cobra.Command { aCmd := &AddCommand{ FinalCurveCmd: basecmd.FinalCurveCmd{ Use: "add", diff --git a/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go b/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go new file mode 100644 index 0000000000..2806b58291 --- /dev/null +++ b/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveCli + * Created Date: 2022-08-10 + * Author: chengyi (Cyber-SiKu) + */ + +package query + +import ( + "errors" + "fmt" + "os" + "strconv" + "strings" + "time" + + basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" + "github.com/opencurve/curve/tools-v2/pkg/config" + "github.com/opencurve/curve/tools-v2/pkg/output" + "github.com/pkg/xattr" + "github.com/schollz/progressbar/v3" + "github.com/spf13/cobra" +) + +const ( + queryExample = `$ curve fs warmup query /mnt/warmup ` + + CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.op" +) + +type QueryCommand struct { + basecmd.FinalCurveCmd + path string + interval time.Duration +} + +var _ basecmd.FinalCurveCmdFunc = (*QueryCommand)(nil) // check interface + +func NewQueryCommand() *cobra.Command { + qCmd := &QueryCommand{ + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "query", + Short: "query the warmup progress", + Example: queryExample, + }, + } + basecmd.NewFinalCurveCli(&qCmd.FinalCurveCmd, qCmd) + return qCmd.Cmd +} + +func (qCmd *QueryCommand) AddFlags() { + config.AaddIntervalOptionFlag(qCmd.Cmd) +} + +func (qCmd *QueryCommand) Init(cmd *cobra.Command, args []string) error { + if len(args) < 1 { + return errors.New("need a path") + } + qCmd.path = args[0] + qCmd.interval = config.GetIntervalOptionFlag(qCmd.Cmd) + return nil +} + +func (qCmd *QueryCommand) Print(cmd *cobra.Command, args []string) error { + return output.FinalCmdOutput(&qCmd.FinalCurveCmd, qCmd) +} + +func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error { + filename := strings.Split(qCmd.path, "/") + bar := progressbar.NewOptions64(1, + progressbar.OptionSetDescription("[cyan]Warmup[reset] "+filename[len(filename)-1] + "..."), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSpinnerType(14), + progressbar.OptionFullWidth(), + progressbar.OptionThrottle(65*time.Millisecond), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionOnCompletion(func() { + fmt.Fprint(os.Stderr, "\n") + }), + progressbar.OptionEnableColorCodes(true), + progressbar.OptionSetTheme(progressbar.Theme{ + Saucer: "[green]=[reset]", + SaucerHead: "[green]>[reset]", + SaucerPadding: " ", + BarStart: "[", + BarEnd: "]", + })) + for { + result, err := xattr.Get(qCmd.path, CURVEFS_WARMUP_OP_XATTR) + if err != nil { + return err + } + resultStr := string(result) + if resultStr == "finished" { + break + } + strs := strings.Split(resultStr, "/") + if len(strs) != 2 { + break + } + finished, err := strconv.ParseUint(strs[0], 10, 64) + if err != nil { + break + } + total, err := strconv.ParseUint(strs[1], 10, 64) + if err != nil { + break + } + bar.ChangeMax64(int64(total)) + bar.Set64(int64(finished)) + time.Sleep(qCmd.interval) + } + bar.Finish() + return nil +} + +func (qCmd *QueryCommand) ResultPlainOutput() error { + return output.FinalCmdOutputPlain(&qCmd.FinalCurveCmd) +} diff --git a/tools-v2/pkg/cli/command/curvefs/warmup/warmup.go b/tools-v2/pkg/cli/command/curvefs/warmup/warmup.go index 5e5d52d34c..bfbf61e32e 100644 --- a/tools-v2/pkg/cli/command/curvefs/warmup/warmup.go +++ b/tools-v2/pkg/cli/command/curvefs/warmup/warmup.go @@ -25,6 +25,7 @@ package warmup import ( basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvefs/warmup/add" + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvefs/warmup/query" "github.com/spf13/cobra" ) @@ -36,7 +37,8 @@ var _ basecmd.MidCurveCmdFunc = (*WarmupCommand)(nil) // check interface func (warmupCmd *WarmupCommand) AddSubCommands() { warmupCmd.Cmd.AddCommand( - add.NewAddCommandCommand(), + add.NewAddCommand(), + query.NewQueryCommand(), ) } diff --git a/tools-v2/pkg/config/fs.go b/tools-v2/pkg/config/fs.go index e8b8c23973..3e6404834c 100644 --- a/tools-v2/pkg/config/fs.go +++ b/tools-v2/pkg/config/fs.go @@ -87,6 +87,12 @@ const ( CURVEFS_SERVERS = "servers" VIPER_CURVEFS_SERVERS = "curvefs.servers" CURVEFS_DEFAULT_SERVERS = "127.0.0.1:7001,127.0.0.1:7002" + CURVEFS_INTERVAL = "interval" + VIPER_CURVEFS_INTERVAL = "curvefs.interval" + CURVEFS_DEFAULT_INTERVAL = 1 * time.Second + CURVEFS_DAEMON = "daemon" + VIPER_CURVEFS_DAEMON = "curvefs.daemon" + CURVEFS_DEFAULT_DAEMON = false // S3 CURVEFS_S3_AK = "s3.ak" @@ -161,6 +167,8 @@ var ( CURVEFS_MARGIN: VIPER_CURVEFS_MARGIN, CURVEFS_SERVERS: VIPER_CURVEFS_SERVERS, CURVEFS_FILELIST: VIPER_CURVEFS_FILELIST, + CURVEFS_INTERVAL: VIPER_CURVEFS_INTERVAL, + CURVEFS_DAEMON: VIPER_CURVEFS_DAEMON, // S3 CURVEFS_S3_AK: VIPER_CURVEFS_S3_AK, @@ -188,6 +196,8 @@ var ( CURVEFS_CLUSTERMAP: CURVEFS_DEFAULT_CLUSTERMAP, CURVEFS_MARGIN: CURVEFS_DEFAULT_MARGIN, CURVEFS_SERVERS: CURVEFS_DEFAULT_SERVERS, + CURVEFS_INTERVAL: CURVEFS_DEFAULT_INTERVAL, + CURVEFS_DAEMON: CURVEFS_DEFAULT_DAEMON, // S3 CURVEFS_S3_AK: CURVEFS_DEFAULT_S3_AK, @@ -281,6 +291,18 @@ func AddBoolOptionFlag(cmd *cobra.Command, name string, usage string) { } } +func AddBoolOptionPFlag(cmd *cobra.Command, name string, short string, usage string) { + defaultValue := FLAG2DEFAULT[name] + if defaultValue == nil { + defaultValue = false + } + cmd.Flags().BoolP(name, short, defaultValue.(bool), usage) + err := viper.BindPFlag(FLAG2VIPER[name], cmd.Flags().Lookup(name)) + if err != nil { + cobra.CheckErr(err) + } +} + func AddDurationOptionFlag(cmd *cobra.Command, name string, usage string) { defaultValue := FLAG2DEFAULT[name] if defaultValue == nil { @@ -714,6 +736,28 @@ func GetFileListOptionFlag(cmd *cobra.Command) string { return GetFlagString(cmd, CURVEFS_FILELIST) } +// interval [option] +func AddIntervalOptionFlag(cmd *cobra.Command) { + AddDurationOptionFlag(cmd, CURVEFS_INTERVAL, "interval time") +} + +func GetIntervalFlag(cmd *cobra.Command) time.Duration { + return GetFlagDuration(cmd, CURVEFS_INTERVAL) +} + +// daemon [option] +func AddDaemonOptionFlag(cmd *cobra.Command) { + AddBoolOptionFlag(cmd, CURVEFS_DAEMON, "run in daemon mode") +} + +func AddDaemonOptionPFlag(cmd *cobra.Command) { + AddBoolOptionPFlag(cmd, CURVEFS_DAEMON, "d", "run in daemon mode") +} + +func GetDaemonFlag(cmd *cobra.Command) bool { + return GetFlagBool(cmd, CURVEFS_DAEMON) +} + /* required */ // copysetid [required] diff --git a/tools-v2/pkg/config/template.yaml b/tools-v2/pkg/config/template.yaml index b4895f9e8a..b183c1c48b 100644 --- a/tools-v2/pkg/config/template.yaml +++ b/tools-v2/pkg/config/template.yaml @@ -9,6 +9,7 @@ curvefs: mdsAddr: 127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 mdsDummyAddr: 127.0.0.1:7700,127.0.0.1:7701,127.0.0.1:7702 etcdAddr: 127.0.0.1:23790,127.0.0.1:23791, 127.0.0.1:23792 + interval: 1s s3: ak: ak sk: sk