Skip to content

Commit

Permalink
curvefs/client:ptimize warmup performance
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong authored and ilixiaocui committed Oct 13, 2022
1 parent 4cdac84 commit e936d97
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 36 deletions.
1 change: 1 addition & 0 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fuseClient.disableXattr=false
fuseClient.maxDataSize=1024
# default refresh data interval 30s
fuseClient.refreshDataIntervalSec=30
fuseClient.warmupThreadsNum=10

#### volume
volume.bigFileSize=1048576
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
conf->GetValueFatalIfFail("fuseClient.disableXattr",
&clientOption->disableXattr);
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);
conf->GetValueFatalIfFail("fuseClient.warmupThreadsNum",
&clientOption->warmupThreadsNum);

LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
&clientOption->enableFuseSplice))
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ struct FuseClientOption {
bool enableMultiMountPointRename = false;
bool enableFuseSplice = false;
bool disableXattr = false;
uint32_t warmupThreadsNum;
};

void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption);
Expand Down
1 change: 0 additions & 1 deletion curvefs/src/client/dentry_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent,
<< ", parent = " << parent << ", name = " << name;
return MetaStatusCodeToCurvefsErrCode(ret);
}

if (!curvefs::client::common::FLAGS_enableCto) {
dCache_->Put(key, *out);
}
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
warmUpFile_.exist = false;
bgCmdStop_.store(false, std::memory_order_release);
bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this);
taskFetchMetaPool_.Start(WARMUP_THREADS);
taskFetchMetaPool_.Start(option_.warmupThreadsNum);
return ret3;
}

Expand Down
5 changes: 2 additions & 3 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@

#define DirectIOAlignment 512
#define WARMUP_CHECKINTERVAL_US 1000*1000
#define WARMUP_THREADS 10

using ::curve::common::Atomic;
using ::curve::common::InterruptibleSleeper;
Expand Down Expand Up @@ -264,9 +263,9 @@ class FuseClient {
void SetEnableSumInDir(bool enable) {
enableSumInDir_ = enable;
}
std::list<fuse_ino_t>& GetReadAheadFiles() {
void GetReadAheadFiles(std::list<fuse_ino_t>* readAheadFiles) {
std::unique_lock<std::mutex> lck(fetchMtx_);
return readAheadFiles_;
*readAheadFiles = std::move(readAheadFiles_);
}

void GetWarmUpFile(WarmUpFileContext_t* warmUpFile) {
Expand Down
82 changes: 56 additions & 26 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
inodeManager_, mdsClient_, fsCacheManager,
nullptr, true);
}

isWarmUping_.store(false);
bgFetchStop_.store(false, std::memory_order_release);
bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this);
GetTaskFetchPool();
Expand Down Expand Up @@ -102,8 +102,7 @@ void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile,

void FuseS3Client::BackGroundFetch() {
while (!bgFetchStop_.load(std::memory_order_acquire)) {
LOG_EVERY_N(WARNING, 100)
<< "fetch thread start.";
usleep(WARMUP_CHECKINTERVAL_US);
if (hasWarmUpTask()) { // new warmup task
WarmUpFileContext_t warmUpFile;
GetWarmUpFile(&warmUpFile);
Expand All @@ -118,15 +117,30 @@ void FuseS3Client::BackGroundFetch() {
}
{ // file need warmup
std::list<fuse_ino_t> readAheadFiles;
readAheadFiles.swap(GetReadAheadFiles());
for (auto iter : readAheadFiles) {
VLOG(9) << "BackGroundFetch: " << iter;
fetchDataEnqueue(iter);
GetReadAheadFiles(&readAheadFiles);
if (!readAheadFiles.empty()) {
LOG(INFO) << "num of files is need loaded is: "
<< readAheadFiles.size();
for (auto iter : readAheadFiles) {
VLOG(9) << "BackGroundFetch: " << iter;
fetchDataEnqueue(iter);
}
}
}
LOG_EVERY_N(WARNING, 100)
<< "fetch thread end.";
usleep(WARMUP_CHECKINTERVAL_US);
{ // objs will be downloaded
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
if (needWarmupObjs_.empty()) {
continue;
}
}
if (isWarmUping_.exchange(true)) {
continue;
}
std::thread downloadThread =
std::thread(&FuseS3Client::WarmUpAllObjs, this);
downloadThread.detach();
}
}
return;
}
Expand All @@ -148,7 +162,7 @@ void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) {
s3ChunkInfoMap = inodeWrapper->GetChunkInfoMap();
}
if (nullptr == s3ChunkInfoMap ||
s3ChunkInfoMap->empty()) {
s3ChunkInfoMap->empty()) {
return;
}
travelChunks(ino, s3ChunkInfoMap);
Expand All @@ -157,8 +171,7 @@ void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) {
}

// travel and download all objs belong to the chunk
void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
std::list<std::pair<std::string, uint64_t>>* prefetchObjs) {
void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo) {
uint64_t blockSize = s3Adaptor_->GetBlockSize();
uint64_t chunkSize = s3Adaptor_->GetChunkSize();
uint64_t offset, len, chunkid, compaction;
Expand All @@ -179,7 +192,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
if (len < blockSize) { // just one block
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexBegin, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(objectName, len));
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(std::make_pair(objectName, len));
} else {
// the offset in the block
uint64_t blockPos = chunkPos % blockSize;
Expand Down Expand Up @@ -210,7 +224,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
travelStartIndex = blockIndexBegin + 1;
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexBegin, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(std::make_pair(
objectName, firstBlockSize));
} else {
travelStartIndex = blockIndexBegin;
Expand All @@ -223,7 +238,8 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
chunkid, blockIndexEnd, compaction, fsId, ino);
// there is no need to care about the order
// in which objects are downloaded
prefetchObjs->push_back(
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(
std::make_pair(objectName, lastBlockSize));
} else {
travelEndIndex = blockIndexEnd;
Expand All @@ -246,22 +262,36 @@ void FuseS3Client::travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
blockIndex <= travelEndIndex ; blockIndex++) {
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndex, compaction, fsId, ino);
prefetchObjs->push_back(std::make_pair(objectName, blockSize));
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
needWarmupObjs_.push_back(
std::make_pair(objectName, blockSize));
}
}
}
}
}

// TODO(hzwuhongsong): These logics are very similar to other place,
// try to merge it
void FuseS3Client::WarmUpAllObjs(
const std::list<std::pair<std::string, uint64_t>> &prefetchObjs) {
void FuseS3Client::WarmUpAllObjs() {
std::list<std::pair<std::string, uint64_t>> needWarmupObjs;
{
std::unique_lock<std::mutex> lck(warmupObjsMtx_);
LOG(INFO) << "num of objs need loaded is: " << needWarmupObjs_.size();
needWarmupObjs = std::move(needWarmupObjs_);
}
std::atomic<uint64_t> pendingReq(0);
curve::common::CountDownEvent cond(1);
// callback function
GetObjectAsyncCallBack cb =
[&](const S3Adapter *adapter,
const std::shared_ptr<GetObjectAsyncContext> &context) {
if (bgFetchStop_.load()) {
LOG(INFO) << "need stop warmup";
cond.Signal();
return;
}
if (context->retCode == 0) {
VLOG(9) << "Get Object success: " << context->key;
int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(
Expand All @@ -283,10 +313,10 @@ void FuseS3Client::WarmUpAllObjs(
s3Adaptor_->GetS3Client()->DownloadAsync(context);
};

pendingReq.fetch_add(prefetchObjs.size(), std::memory_order_seq_cst);
if (pendingReq.load(std::memory_order_seq_cst)) {
VLOG(9) << "wait for pendingReq";
for (auto iter : prefetchObjs) {
pendingReq.fetch_add(needWarmupObjs.size(), std::memory_order_seq_cst);
if (pendingReq.load()) {
VLOG(9) << "wait for pendingReq" << pendingReq.load();
for (auto iter : needWarmupObjs) {
VLOG(9) << "download start: " << iter.first;
std::string name = iter.first;
uint64_t readLen = iter.second;
Expand All @@ -307,18 +337,18 @@ void FuseS3Client::WarmUpAllObjs(
if (pendingReq.load())
cond.Wait();
}
isWarmUping_.exchange(false);
LOG(INFO) << "num of objs is loaded over ";
}

void FuseS3Client::travelChunks(fuse_ino_t ino, google::protobuf::Map<uint64_t,
S3ChunkInfoList> *s3ChunkInfoMap) {
VLOG(9) << "travel chunk start: " << ino
<< ", size: " << s3ChunkInfoMap->size();
std::list<std::pair<std::string, uint64_t>> prefetchObjs;
for (auto &iter : *s3ChunkInfoMap) {
VLOG(9) << "travel chunk: " << iter.first;
travelChunk(ino, iter.second, &prefetchObjs);
travelChunk(ino, iter.second);
}
WarmUpAllObjs(prefetchObjs);
VLOG(9) << "travel chunks end";
return;
}
Expand Down
11 changes: 6 additions & 5 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,20 @@ class FuseS3Client : public FuseClient {
void travelChunks(fuse_ino_t ino, google::protobuf::Map<uint64_t,
S3ChunkInfoList> *s3ChunkInfoMap);
// travel and download all objs belong to the chunk
void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo,
std::list<std::pair<std::string, uint64_t>>* prefetchObjs);
void travelChunk(fuse_ino_t ino, S3ChunkInfoList chunkInfo);
// warmup all the prefetchObjs
void WarmUpAllObjs(const std::list<
std::pair<std::string, uint64_t>> &prefetchObjs);
void WarmUpAllObjs();

private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;

Thread bgFetchThread_;
std::atomic<bool> bgFetchStop_;
std::mutex fetchMtx_;

std::mutex warmupObjsMtx_;
std::atomic<bool> isWarmUping_;
std::list<std::pair<std::string, uint64_t>> needWarmupObjs_;
};


Expand Down
1 change: 1 addition & 0 deletions curvefs/test/client/test_fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,7 @@ class TestFuseS3Client : public ::testing::Test {
fuseClientOption_.dummyServerStartPort = 5000;
fuseClientOption_.maxNameLength = 20u;
fuseClientOption_.listDentryThreads = 2;
fuseClientOption_.warmupThreadsNum = 10;
auto fsInfo = std::make_shared<FsInfo>();
fsInfo->set_fsid(fsId);
fsInfo->set_fsname("s3fs");
Expand Down

0 comments on commit e936d97

Please sign in to comment.