diff --git a/curvefs/conf/client.conf b/curvefs/conf/client.conf index ff83857c47..da4bebb4b8 100644 --- a/curvefs/conf/client.conf +++ b/curvefs/conf/client.conf @@ -103,6 +103,7 @@ fuseClient.disableXattr=false fuseClient.maxDataSize=1024 # default refresh data interval 30s fuseClient.refreshDataIntervalSec=30 +fuseClient.warmupThreadsNum=10 #### volume volume.bigFileSize=1048576 diff --git a/curvefs/src/client/common/config.cpp b/curvefs/src/client/common/config.cpp index 217bbcabde..650af766d9 100644 --- a/curvefs/src/client/common/config.cpp +++ b/curvefs/src/client/common/config.cpp @@ -279,6 +279,8 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) { conf->GetValueFatalIfFail("fuseClient.disableXattr", &clientOption->disableXattr); conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto); + conf->GetValueFatalIfFail("fuseClient.warmupThreadsNum", + &clientOption->warmupThreadsNum); LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice", &clientOption->enableFuseSplice)) diff --git a/curvefs/src/client/common/config.h b/curvefs/src/client/common/config.h index 8a41931b73..fda77fdea3 100644 --- a/curvefs/src/client/common/config.h +++ b/curvefs/src/client/common/config.h @@ -197,6 +197,7 @@ struct FuseClientOption { bool enableMultiMountPointRename = false; bool enableFuseSplice = false; bool disableXattr = false; + uint32_t warmupThreadsNum; }; void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption); diff --git a/curvefs/src/client/dentry_cache_manager.cpp b/curvefs/src/client/dentry_cache_manager.cpp index 452b71d6b3..5bd4e68896 100644 --- a/curvefs/src/client/dentry_cache_manager.cpp +++ b/curvefs/src/client/dentry_cache_manager.cpp @@ -78,7 +78,6 @@ CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent, << ", parent = " << parent << ", name = " << name; return MetaStatusCodeToCurvefsErrCode(ret); } - if (!curvefs::client::common::FLAGS_enableCto) { dCache_->Put(key, *out); } diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp index c427d57cdb..4f288b90a1 100644 --- a/curvefs/src/client/fuse_client.cpp +++ b/curvefs/src/client/fuse_client.cpp @@ -135,7 +135,7 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) { warmUpFile_.exist = false; bgCmdStop_.store(false, std::memory_order_release); bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this); - taskFetchMetaPool_.Start(WARMUP_THREADS); + taskFetchMetaPool_.Start(option_.warmupThreadsNum); return ret3; } diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h index 28bb31295c..7b8228e7db 100644 --- a/curvefs/src/client/fuse_client.h +++ b/curvefs/src/client/fuse_client.h @@ -55,7 +55,6 @@ #define DirectIOAlignment 512 #define WARMUP_CHECKINTERVAL_US 1000*1000 -#define WARMUP_THREADS 10 using ::curve::common::Atomic; using ::curve::common::InterruptibleSleeper; @@ -264,9 +263,9 @@ class FuseClient { void SetEnableSumInDir(bool enable) { enableSumInDir_ = enable; } - std::list& GetReadAheadFiles() { + void GetReadAheadFiles(std::list* readAheadFiles) { std::unique_lock lck(fetchMtx_); - return readAheadFiles_; + *readAheadFiles = std::move(readAheadFiles_); } void GetWarmUpFile(WarmUpFileContext_t* warmUpFile) { diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index 0862fe297f..f0bee13c16 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -73,7 +73,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { inodeManager_, mdsClient_, fsCacheManager, nullptr, true); } - + isWarmUping_.store(false); bgFetchStop_.store(false, std::memory_order_release); bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this); GetTaskFetchPool(); @@ -102,8 +102,7 @@ void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile, void FuseS3Client::BackGroundFetch() { while (!bgFetchStop_.load(std::memory_order_acquire)) { - LOG_EVERY_N(WARNING, 100) - << "fetch thread start."; + usleep(WARMUP_CHECKINTERVAL_US); if (hasWarmUpTask()) { // new warmup task WarmUpFileContext_t warmUpFile; GetWarmUpFile(&warmUpFile); @@ -118,15 +117,30 @@ void FuseS3Client::BackGroundFetch() { } { // file need warmup std::list readAheadFiles; - readAheadFiles.swap(GetReadAheadFiles()); - for (auto iter : readAheadFiles) { - VLOG(9) << "BackGroundFetch: " << iter; - fetchDataEnqueue(iter); + GetReadAheadFiles(&readAheadFiles); + if (!readAheadFiles.empty()) { + LOG(INFO) << "num of files is need loaded is: " + << readAheadFiles.size(); + for (auto iter : readAheadFiles) { + VLOG(9) << "BackGroundFetch: " << iter; + fetchDataEnqueue(iter); + } } } - LOG_EVERY_N(WARNING, 100) - << "fetch thread end."; - usleep(WARMUP_CHECKINTERVAL_US); + { // objs will be downloaded + { + std::unique_lock lck(warmupObjsMtx_); + if (needWarmupObjs_.empty()) { + continue; + } + } + if (isWarmUping_.exchange(true)) { + continue; + } + std::thread downloadThread = + std::thread(&FuseS3Client::WarmUpAllObjs, this); + downloadThread.detach(); + } } return; } @@ -148,7 +162,7 @@ void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) { s3ChunkInfoMap = inodeWrapper->GetChunkInfoMap(); } if (nullptr == s3ChunkInfoMap || - s3ChunkInfoMap->empty()) { + s3ChunkInfoMap->empty()) { return; } travelChunks(ino, s3ChunkInfoMap); @@ -157,8 +171,7 @@ void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) { } // travel and download all objs belong to the chunk -void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, - std::list>* prefetchObjs) { +void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo) { uint64_t blockSize = s3Adaptor_->GetBlockSize(); uint64_t chunkSize = s3Adaptor_->GetChunkSize(); uint64_t offset, len, chunkid, compaction; @@ -179,7 +192,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, if (len < blockSize) { // just one block auto objectName = curvefs::common::s3util::GenObjName( chunkid, blockIndexBegin, compaction, fsId, ino); - prefetchObjs->push_back(std::make_pair(objectName, len)); + std::unique_lock lck(warmupObjsMtx_); + needWarmupObjs_.push_back(std::make_pair(objectName, len)); } else { // the offset in the block uint64_t blockPos = chunkPos % blockSize; @@ -210,7 +224,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, travelStartIndex = blockIndexBegin + 1; auto objectName = curvefs::common::s3util::GenObjName( chunkid, blockIndexBegin, compaction, fsId, ino); - prefetchObjs->push_back(std::make_pair( + std::unique_lock lck(warmupObjsMtx_); + needWarmupObjs_.push_back(std::make_pair( objectName, firstBlockSize)); } else { travelStartIndex = blockIndexBegin; @@ -223,7 +238,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, chunkid, blockIndexEnd, compaction, fsId, ino); // there is no need to care about the order // in which objects are downloaded - prefetchObjs->push_back( + std::unique_lock lck(warmupObjsMtx_); + needWarmupObjs_.push_back( std::make_pair(objectName, lastBlockSize)); } else { travelEndIndex = blockIndexEnd; @@ -246,7 +262,11 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, blockIndex <= travelEndIndex ; blockIndex++) { auto objectName = curvefs::common::s3util::GenObjName( chunkid, blockIndex, compaction, fsId, ino); - prefetchObjs->push_back(std::make_pair(objectName, blockSize)); + { + std::unique_lock lck(warmupObjsMtx_); + needWarmupObjs_.push_back( + std::make_pair(objectName, blockSize)); + } } } } @@ -254,14 +274,24 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, // TODO(hzwuhongsong): These logics are very similar to other place, // try to merge it -void FuseS3Client::WarmUpAllObjs( - const std::list> &prefetchObjs) { +void FuseS3Client::WarmUpAllObjs() { + std::list> needWarmupObjs; + { + std::unique_lock lck(warmupObjsMtx_); + LOG(INFO) << "num of objs need loaded is: " << needWarmupObjs_.size(); + needWarmupObjs = std::move(needWarmupObjs_); + } std::atomic pendingReq(0); curve::common::CountDownEvent cond(1); // callback function GetObjectAsyncCallBack cb = [&](const S3Adapter *adapter, const std::shared_ptr &context) { + if (bgFetchStop_.load()) { + LOG(INFO) << "need stop warmup"; + cond.Signal(); + return; + } if (context->retCode == 0) { VLOG(9) << "Get Object success: " << context->key; int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect( @@ -283,10 +313,10 @@ void FuseS3Client::WarmUpAllObjs( s3Adaptor_->GetS3Client()->DownloadAsync(context); }; - pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst); - if (pendingReq.load(std::memory_order_seq_cst)) { - VLOG(9) << "wait for pendingReq"; - for (auto iter : prefetchObjs) { + pendingReq.fetch_add(needWarmupObjs.size(), std::memory_order_seq_cst); + if (pendingReq.load()) { + VLOG(9) << "wait for pendingReq" << pendingReq.load(); + for (auto iter : needWarmupObjs) { VLOG(9) << "download start: " << iter.first; std::string name = iter.first; uint64_t readLen = iter.second; @@ -307,18 +337,18 @@ void FuseS3Client::WarmUpAllObjs( if (pendingReq.load()) cond.Wait(); } + isWarmUping_.exchange(false); + LOG(INFO) << "num of objs is loaded over "; } void FuseS3Client::travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap) { VLOG(9) << "travel chunk start: " << ino << ", size: " << s3ChunkInfoMap->size(); - std::list> prefetchObjs; for (auto &iter : *s3ChunkInfoMap) { VLOG(9) << "travel chunk: " << iter.first; - travelChunk(ino, iter.second, &prefetchObjs); + travelChunk(ino, iter.second); } - WarmUpAllObjs(prefetchObjs); VLOG(9) << "travel chunks end"; return; } diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index ec2ce3b055..5b581af7aa 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -108,11 +108,9 @@ class FuseS3Client : public FuseClient { void travelChunks(fuse_ino_t ino, google::protobuf::Map *s3ChunkInfoMap); // travel and download all objs belong to the chunk - void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo, - std::list>* prefetchObjs); + void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo); // warmup all the prefetchObjs - void WarmUpAllObjs(const std::list< - std::pair> &prefetchObjs); + void WarmUpAllObjs(); private: // s3 adaptor @@ -120,7 +118,10 @@ class FuseS3Client : public FuseClient { Thread bgFetchThread_; std::atomic bgFetchStop_; - std::mutex fetchMtx_; + + std::mutex warmupObjsMtx_; + std::atomic isWarmUping_; + std::list> needWarmupObjs_; }; diff --git a/curvefs/test/client/test_fuse_client.cpp b/curvefs/test/client/test_fuse_client.cpp index 3823dcccb4..81907ba4f1 100644 --- a/curvefs/test/client/test_fuse_client.cpp +++ b/curvefs/test/client/test_fuse_client.cpp @@ -1762,6 +1762,7 @@ class TestFuseS3Client : public ::testing::Test { fuseClientOption_.dummyServerStartPort = 5000; fuseClientOption_.maxNameLength = 20u; fuseClientOption_.listDentryThreads = 2; + fuseClientOption_.warmupThreadsNum = 10; auto fsInfo = std::make_shared(); fsInfo->set_fsid(fsId); fsInfo->set_fsname("s3fs");