diff --git a/conf/client.conf b/conf/client.conf index 9b5fe31a8e..f93450bc43 100644 --- a/conf/client.conf +++ b/conf/client.conf @@ -127,6 +127,14 @@ global.logPath=/data/log/curve/ # 单元测试情况下 # logpath=./runlog/ +# +################# 读源卷相关配置 ############### +# +# 读取源卷时打开的fd超时关闭时间300s +closefd.timeout=300 +# 读取源卷时打开的fd后台线程每600s扫描一遍fdMap,关闭超时fd +closefd.timeInterval=600 + # ############### metric 配置信息 ############# # diff --git a/src/client/client_common.h b/src/client/client_common.h index 4c312355db..7cf6d181b9 100644 --- a/src/client/client_common.h +++ b/src/client/client_common.h @@ -76,6 +76,8 @@ typedef struct ChunkIDInfo { CopysetID cpid_ = 0; LogicPoolID lpid_ = 0; + bool chunkExist = true; + ChunkIDInfo() = default; ChunkIDInfo(ChunkID cid, LogicPoolID lpid, CopysetID cpid) diff --git a/src/client/client_config.h b/src/client/client_config.h index 2721a80f27..904134e4a7 100644 --- a/src/client/client_config.h +++ b/src/client/client_config.h @@ -37,6 +37,13 @@ class ClientConfig { return fileServiceOption_; } + /** + * test use, set the fileServiceOption_ + */ + void SetFileServiceOption(FileServiceOption opt) { + fileServiceOption_ = opt; + } + uint16_t GetDummyserverStartPort(); private: diff --git a/src/client/config_info.h b/src/client/config_info.h index 302078292b..17f457ba11 100644 --- a/src/client/config_info.h +++ b/src/client/config_info.h @@ -213,6 +213,16 @@ struct TaskThreadOption { uint32_t isolationTaskThreadPoolSize = 1; }; +/** + * timed close fd thread in SourceReader config + * @fdTimeout: sourcereader fd timeout + * @fdCloseTimeInterval: close sourcereader fd time interval + */ +struct CloseFdThreadOption { + uint32_t fdTimeout = 300; + uint32_t fdCloseTimeInterval = 600; +}; + /** * IOOption存储了当前io 操作所需要的所有配置信息 */ @@ -222,6 +232,7 @@ struct IOOption { MetaCacheOption metaCacheOpt; TaskThreadOption taskThreadOpt; RequestScheduleOption reqSchdulerOpt; + CloseFdThreadOption closeFdThreadOption; }; /** diff --git a/src/client/io_tracker.cpp b/src/client/io_tracker.cpp index 0214017540..9d64298e8c 100644 --- a/src/client/io_tracker.cpp +++ b/src/client/io_tracker.cpp @@ -30,6 +30,8 @@ #include "src/client/request_scheduler.h" #include "src/client/request_closure.h" #include "src/common/timeutility.h" +#include "src/client/libcurve_file.h" +#include "src/client/source_reader.h" namespace curve { namespace client { @@ -91,15 +93,34 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) { if (ret == 0) { PrepareReadIOBuffers(reqlist_.size()); uint32_t subIoIndex = 0; + std::vector originReadVec; - reqcount_.store(reqlist_.size(), std::memory_order_release); std::for_each(reqlist_.begin(), reqlist_.end(), [&](RequestContext* r) { + // fake subrequest + if (!r->idinfo_.chunkExist) { + // the clone source is empty + if (r->sourceInfo_.cloneFileSource.empty()) { + // add zero data + r->readData_.resize(r->rawlength_, 0); + r->done_->SetFailed(LIBCURVE_ERROR::OK); + } else { + // read from original volume + originReadVec.emplace_back(r); + } + } + r->done_->SetFileMetric(fileMetric_); r->done_->SetIOManager(iomanager_); r->subIoIndex_ = subIoIndex++; }); - ret = scheduler_->ScheduleRequest(reqlist_); + reqcount_.store(reqlist_.size(), std::memory_order_release); + if (scheduler_->ScheduleRequest(reqlist_) == 0 && + ReadFromSource(originReadVec, fileInfo->userinfo) == 0) { + ret = 0; + } else { + ret = -1; + } } else { LOG(ERROR) << "splitor read io failed, " << "offset = " << offset_ << ", length = " << length_; @@ -111,6 +132,12 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) { } } +int IOTracker::ReadFromSource(std::vector reqCtxVec, + const UserInfo_t& userInfo) { + SourceReader &sourceReader = SourceReader::GetInstance(); + return sourceReader.Read(reqCtxVec, userInfo); +} + void IOTracker::StartWrite(const void* buf, off_t offset, size_t length, MDSClient* mdsclient, const FInfo_t* fileInfo) { data_ = const_cast(buf); diff --git a/src/client/io_tracker.h b/src/client/io_tracker.h index e88b468062..7b84be39c3 100644 --- a/src/client/io_tracker.h +++ b/src/client/io_tracker.h @@ -246,6 +246,15 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker { // perform read operation void DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo); + /** + * read from the origin + * @param: reqCtxVec the read request context vector + * @param: the user info + * @return 0 success; -1 fail + */ + int ReadFromSource(std::vector reqCtxVec, + const UserInfo_t& userInfo); + // perform write operation void DoWrite(MDSClient* mdsclient, const FInfo_t* fileInfo); diff --git a/src/client/libcurve_client.cpp b/src/client/libcurve_client.cpp index 3c0f6cf234..324630ce57 100644 --- a/src/client/libcurve_client.cpp +++ b/src/client/libcurve_client.cpp @@ -22,6 +22,7 @@ #include "include/client/libcurve.h" #include "src/client/libcurve_file.h" +#include "src/client/source_reader.h" namespace curve { namespace client { @@ -34,7 +35,13 @@ CurveClient::~CurveClient() { } int CurveClient::Init(const std::string& configPath) { - return fileClient_->Init(configPath); + if(0 == fileClient_->Init(configPath) && + 0 == SourceReader::GetInstance().Init(configPath)) { + SourceReader::GetInstance().Run(); + return LIBCURVE_ERROR::OK; + } else { + return -LIBCURVE_ERROR::FAILED; + } } void CurveClient::UnInit() { @@ -124,4 +131,4 @@ void CurveClient::SetFileClient(FileClient* client) { } } // namespace client -} // namespace curve +} // namespace curve \ No newline at end of file diff --git a/src/client/libcurve_file.h b/src/client/libcurve_file.h index 6637d9ff9d..c98575785d 100644 --- a/src/client/libcurve_file.h +++ b/src/client/libcurve_file.h @@ -269,6 +269,31 @@ class FileClient { return openedFileNum_.get_value(); } + /** + * test use, set the mdsclient_ + */ + void SetMdsClient(MDSClient* client) { + mdsClient_ = client; + } + + /** + * test use, set the clientconfig_ + */ + void SetClientConfig(ClientConfig cfg) { + clientconfig_ = cfg; + } + + const ClientConfig& GetClientConfig() { + return clientconfig_; + } + + /** + * test use, get the fileserviceMap_ + */ + std::unordered_map& GetFileServiceMap() { + return fileserviceMap_; + } + private: bool StartDummyServer(); diff --git a/src/client/request_scheduler.cpp b/src/client/request_scheduler.cpp index b5ec1dc161..ee60923d9b 100644 --- a/src/client/request_scheduler.cpp +++ b/src/client/request_scheduler.cpp @@ -91,6 +91,14 @@ int RequestScheduler::ScheduleRequest( if (running_.load(std::memory_order_acquire)) { /* TODO(wudemiao): 后期考虑 qos */ for (auto it : requests) { + // skip the fake request + if (!it->idinfo_.chunkExist) { + if (it->sourceInfo_.cloneFileSource.empty()) { + it->done_->Run(); + } + continue; + } + BBQItem req(it); queue_.PutBack(req); } diff --git a/src/client/source_reader.cpp b/src/client/source_reader.cpp new file mode 100644 index 0000000000..a85c344bb6 --- /dev/null +++ b/src/client/source_reader.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Project: curve + * File Created: 2020-12-23 + * Author: wanghai01 + */ + +#include "src/client/source_reader.h" + +namespace curve { +namespace client { + +struct CurveAioCombineContext { + RequestClosure* done; + CurveAioContext curveCtx; +}; + +void CurveAioCallback(struct CurveAioContext* context) { + auto curveCombineCtx = reinterpret_cast( + reinterpret_cast(context) - + offsetof(CurveAioCombineContext, curveCtx)); + RequestClosure* done = curveCombineCtx->done; + if (context->ret < 0) { + done->SetFailed(LIBCURVE_ERROR::FAILED); + } else { + done->SetFailed(LIBCURVE_ERROR::OK); + } + + delete curveCombineCtx; + brpc::ClosureGuard doneGuard(done); +} + +SourceReader::SourceReader() : fileClient_(new FileClient()) {} + +SourceReader::~SourceReader() { + Stop(); + Uinit(); +} + +void SourceReader::Run() { + if (!running_) { + running_ = true; + fdCloseThread_ = std::thread(&SourceReader::Closefd, this); + LOG(INFO) << "SourceReader fdCloseThread run successfully"; + } else { + LOG(WARNING) << "SourceReader fdCloseThread is running!"; + } +} + +void SourceReader::Stop() { + if (running_) { + running_ = false; + sleeper_.interrupt(); + fdCloseThread_.join(); + LOG(INFO) << "SourceReader fdCloseThread stoped successfully"; + } +} + +int SourceReader::Init(const std::string& configPath) { + if (inited_) { + LOG(WARNING) << "SourceReader already inited!"; + return 0; + } + + if (LIBCURVE_ERROR::OK == fileClient_->Init(configPath)) { + inited_ = true; + return LIBCURVE_ERROR::OK; + } + return -LIBCURVE_ERROR::FAILED; +} + +void SourceReader::Uinit() { + if (!inited_) { + LOG(WARNING) << "SourceReader not inited!"; + return; + } + if (fileClient_ != nullptr) { + for (auto &pair : fdMap_) { + fileClient_->Close(pair.second.first); + } + fdMap_.clear(); + fileClient_->UnInit(); + delete fileClient_; + fileClient_ = nullptr; + inited_ = false; + } +} + +SourceReader& SourceReader::GetInstance() { + static SourceReader originReader; + return originReader; +} + +int SourceReader::Read(std::vector reqCtxVec, + const UserInfo_t& userInfo) { + if (reqCtxVec.size() <= 0) { + return 0; + } + + for (auto reqCtx : reqCtxVec) { + brpc::ClosureGuard doneGuard(reqCtx->done_); + std::string fileName = reqCtx->sourceInfo_.cloneFileSource; + int fd = 0; + { + curve::common::WriteLockGuard writeLockGuard(rwLock_); + auto iter = fdMap_.find(fileName); + if (iter != fdMap_.end()) { + fd = iter->second.first; + } else { + fd = fileClient_->Open4ReadOnly(fileName, userInfo); + if (fd < 0) { + LOG(ERROR) << "Open curve file failed." + << "file name: " << fileName + << " ,return code: " << fd; + return -1; + } + } + fdMap_[fileName] = std::make_pair(fd, time(0)); + } + CurveAioCombineContext *curveCombineCtx = new CurveAioCombineContext(); + curveCombineCtx->done = reqCtx->done_; + curveCombineCtx->curveCtx.offset = reqCtx->sourceInfo_.cloneFileOffset + + reqCtx->offset_; + curveCombineCtx->curveCtx.length = reqCtx->rawlength_; + curveCombineCtx->curveCtx.buf = &reqCtx->readData_; + curveCombineCtx->curveCtx.op = LIBCURVE_OP::LIBCURVE_OP_READ; + curveCombineCtx->curveCtx.cb = CurveAioCallback; + + int ret = fileClient_->AioRead(fd, &curveCombineCtx->curveCtx, + UserDataType::IOBuffer); + if (ret != LIBCURVE_ERROR::OK) { + LOG(ERROR) << "Read curve file failed." + << "file name: " << fileName + << " ,error code: " << ret; + delete curveCombineCtx; + return -1; + } else { + doneGuard.release(); + } + } + return 0; +} + +int SourceReader::Closefd() { + while (running_) { + { + curve::common::WriteLockGuard writeLockGuard(rwLock_); + for (auto iter = fdMap_.begin(); iter != fdMap_.end();) { + int fd = iter->second.first; + time_t timestamp = iter->second.second; + if (time(0) - timestamp > fileClient_->GetClientConfig().GetFileServiceOption(). + ioOpt.closeFdThreadOption.fdTimeout) { + fileClient_->Close(fd); + iter = fdMap_.erase(iter); + } else { + iter++; + } + } + } + + sleeper_.wait_for(std::chrono::seconds(fileClient_->GetClientConfig(). + GetFileServiceOption().ioOpt.closeFdThreadOption.fdCloseTimeInterval)); + } +} + +} // namespace client +} // namespace curve \ No newline at end of file diff --git a/src/client/source_reader.h b/src/client/source_reader.h new file mode 100644 index 0000000000..f3bceb706e --- /dev/null +++ b/src/client/source_reader.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Project: curve + * File Created: 2020-12-23 + * Author: wanghai01 + */ + +#ifndef SRC_CLIENT_SOURCE_READER_H_ +#define SRC_CLIENT_SOURCE_READER_H_ + +#include +#include +#include +#include +#include +#include "include/client/libcurve.h" +#include "src/client/libcurve_file.h" +#include "src/client/request_closure.h" +#include "src/common/interruptible_sleeper.h" + +using curve::client::ClusterContext; +using curve::client::FileClient; +using curve::client::RequestContext; +using curve::client::UserInfo_t; + +namespace curve { +namespace client { + +class SourceReader { + public: + static SourceReader& GetInstance(); + + void SetFileClient(FileClient *client) { + fileClient_ = client; + } + + FileClient* GetFileClient() { + return fileClient_; + } + + std::unordered_map>& GetFdMap() { + return fdMap_; + } + + int Init(const std::string& configPath); + + void Uinit(); + + /** + * run the timed stop fd thread + */ + void Run(); + + /** + * stop the timed stop fd thread + */ + void Stop(); + + /** + * read from the origin + * @param: reqCtxVec the read request context vector + * @param: the user info + * @return 0 success; -1 fail + */ + int Read(std::vector reqCtxVec, + const UserInfo_t& userInfo); + + private: + SourceReader(); + ~SourceReader(); + SourceReader(const SourceReader &); + SourceReader& operator=(const SourceReader &); + + /** + * close the timeout fd with timed thread + */ + int Closefd(); + + FileClient *fileClient_{nullptr}; + // the mutex lock for fdMap_ + curve::common::RWLock rwLock_; + // first: filename; second: + std::unordered_map> fdMap_; + // is initialized + bool inited_ = false; + std::thread fdCloseThread_; + std::atomic running_; + curve::common::InterruptibleSleeper sleeper_; + +}; + +} // namespace client +} // namespace curve + +#endif // SRC_CLIENT_SOURCE_READER_H_ diff --git a/src/client/splitor.cpp b/src/client/splitor.cpp index 2e7d0dc16f..8a5be6b0b5 100644 --- a/src/client/splitor.cpp +++ b/src/client/splitor.cpp @@ -180,10 +180,12 @@ bool Splitor::AssignInternal(IOTracker* iotracker, MetaCache* metaCache, metaCache->GetChunkInfoByIndex(chunkidx, &chunkIdInfo); if (errCode == MetaCacheErrorType::CHUNKINFO_NOT_FOUND) { + bool isAllocateSegment = + iotracker->Optype() == OpType::READ ? false : true; if (false == GetOrAllocateSegment( - true, + isAllocateSegment, static_cast(chunkidx) * fileinfo->chunksize, - mdsclient, metaCache, fileinfo)) { + mdsclient, metaCache, fileinfo, chunkidx)) { return false; } @@ -194,6 +196,15 @@ bool Splitor::AssignInternal(IOTracker* iotracker, MetaCache* metaCache, int ret = 0; uint64_t appliedindex_ = 0; + // check whether the chunkIdInfo is normal + if (!chunkIdInfo.chunkExist && iotracker->Optype() == OpType::WRITE) { + if (false == GetOrAllocateSegment(true, + static_cast(chunkidx) * fileinfo->chunksize, + mdsclient, metaCache, fileinfo, chunkidx)) { + return false; + } + } + // only read needs applied-index if (iotracker->Optype() == OpType::READ) { appliedindex_ = metaCache->GetAppliedIndex(chunkIdInfo.lpid_, @@ -227,7 +238,8 @@ bool Splitor::GetOrAllocateSegment(bool allocateIfNotExist, uint64_t offset, MDSClient* mdsClient, MetaCache* metaCache, - const FInfo* fileInfo) { + const FInfo* fileInfo, + ChunkIndex chunkidx) { SegmentInfo segmentInfo; LIBCURVE_ERROR errCode = mdsClient->GetOrAllocateSegment( allocateIfNotExist, offset, fileInfo, &segmentInfo); @@ -237,6 +249,12 @@ bool Splitor::GetOrAllocateSegment(bool allocateIfNotExist, LOG(ERROR) << "GetOrAllocateSegmen failed, filename: " << fileInfo->filename << ", offset: " << offset; return false; + } else if (errCode == LIBCURVE_ERROR::NOT_ALLOCATE) { + // this chunkIdInfo(0, 0, 0) identify the unallocated chunk when read + ChunkIDInfo chunkIdInfo(0, 0, 0); + chunkIdInfo.chunkExist = false; + metaCache->UpdateChunkInfoByIndex(chunkidx, chunkIdInfo); + return true; } const auto chunksize = fileInfo->chunksize; diff --git a/src/client/splitor.h b/src/client/splitor.h index 6975250b23..bd3be2ff31 100644 --- a/src/client/splitor.h +++ b/src/client/splitor.h @@ -118,7 +118,8 @@ class Splitor { uint64_t offset, MDSClient* mdsClient, MetaCache* metaCache, - const FInfo* fileInfo); + const FInfo* fileInfo, + ChunkIndex chunkidx); private: // IO拆分模块所使用的配置信息 diff --git a/test/client/fake/fakeMDS.h b/test/client/fake/fakeMDS.h index f957138e1f..d7dded93ae 100644 --- a/test/client/fake/fakeMDS.h +++ b/test/client/fake/fakeMDS.h @@ -131,7 +131,7 @@ class FakeMDSCurveFSService : public curve::mds::CurveFSService { ::google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); if (fakeGetFileInforet_->controller_ != nullptr && - fakeGetFileInforet_->controller_->Failed()) { + fakeGetFileInforet_->controller_->Failed()) { controller->SetFailed("failed"); } @@ -190,9 +190,14 @@ class FakeMDSCurveFSService : public curve::mds::CurveFSService { fiu_do_on("test/client/fake/fakeMDS.GetOrAllocateSegment", checkFullpath()); - - auto resp = static_cast<::curve::mds::GetOrAllocateSegmentResponse*>( + curve::mds::GetOrAllocateSegmentResponse* resp; + if (request->filename() == "/clonesource") { + resp = static_cast<::curve::mds::GetOrAllocateSegmentResponse*>( + fakeGetOrAllocateSegmentretForClone_->response_); + } else { + resp = static_cast<::curve::mds::GetOrAllocateSegmentResponse*>( fakeGetOrAllocateSegmentret_->response_); + } response->CopyFrom(*resp); } @@ -601,6 +606,10 @@ class FakeMDSCurveFSService : public curve::mds::CurveFSService { fakeGetOrAllocateSegmentret_ = fakeret; } + void SetGetOrAllocateSegmentFakeReturnForClone(FakeReturn* fakeret) { + fakeGetOrAllocateSegmentretForClone_ = fakeret; + } + void SetOpenFile(FakeReturn* fakeret) { fakeopenfile_ = fakeret; } @@ -707,6 +716,7 @@ class FakeMDSCurveFSService : public curve::mds::CurveFSService { FakeReturn* fakeGetFileInforet_; FakeReturn* fakeGetAllocatedSizeRet_; FakeReturn* fakeGetOrAllocateSegmentret_; + FakeReturn* fakeGetOrAllocateSegmentretForClone_; FakeReturn* fakeopenfile_; FakeReturn* fakeclosefile_; FakeReturn* fakerenamefile_; diff --git a/test/client/fake/mock_schedule.cpp b/test/client/fake/mock_schedule.cpp index 3ea2fd66a9..43dd481468 100644 --- a/test/client/fake/mock_schedule.cpp +++ b/test/client/fake/mock_schedule.cpp @@ -44,11 +44,11 @@ int Schedule::ScheduleRequest( // LOG(INFO) << "ENTER MOCK ScheduleRequest"; char fakedate[10] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k'}; curve::client::OpType type = curve::client::OpType::UNKNOWN; - int size = reqlist.size(); + // int size = reqlist.size(); int processed = 0; int totallength = 0; std::vector datavec; - LOG(ERROR) << size; + // LOG(ERROR) << size; if (enableScheduleFailed) { return -1; @@ -64,6 +64,13 @@ int Schedule::ScheduleRequest( func();); for (auto iter : reqlist) { + if (!iter->idinfo_.chunkExist) { + if (iter->sourceInfo_.cloneFileSource.empty()) { + iter->done_->Run(); + } + continue; + } + auto req = iter->done_->GetReqCtx(); if (iter->optype_ == curve::client::OpType::READ_SNAP) { char buf[iter->rawlength_]; // NOLINT @@ -106,11 +113,11 @@ int Schedule::ScheduleRequest( // << ", length = " // << iter->rawlength_; - if (processed >= size) { - iter->done_->SetFailed(0); - iter->done_->Run(); - break; - } + // if (processed >= size) { + // iter->done_->SetFailed(0); + // iter->done_->Run(); + // break; + // } iter->done_->SetFailed(0); iter->done_->Run(); } diff --git a/test/client/iotracker_splitor_unittest.cpp b/test/client/iotracker_splitor_unittest.cpp index a976396020..1994ebd3fb 100644 --- a/test/client/iotracker_splitor_unittest.cpp +++ b/test/client/iotracker_splitor_unittest.cpp @@ -39,12 +39,14 @@ #include "src/client/file_instance.h" #include "src/client/io_tracker.h" #include "src/client/iomanager4file.h" +#include "include/client/libcurve.h" #include "src/client/libcurve_file.h" #include "src/client/mds_client.h" #include "src/client/metacache.h" #include "src/client/metacache_struct.h" #include "src/client/request_context.h" #include "src/client/splitor.h" +#include "src/client/source_reader.h" #include "test/client/fake/fakeMDS.h" #include "test/client/fake/mockMDS.h" #include "test/client/fake/mock_schedule.h" @@ -95,6 +97,8 @@ class IOTrackerSplitorTest : public ::testing::Test { fopt.ioOpt.reqSchdulerOpt.scheduleQueueCapacity = 4096; fopt.ioOpt.reqSchdulerOpt.scheduleThreadpoolSize = 2; fopt.ioOpt.reqSchdulerOpt.ioSenderOpt = fopt.ioOpt.ioSenderOpt; + fopt.ioOpt.closeFdThreadOption.fdTimeout = 300; + fopt.ioOpt.closeFdThreadOption.fdCloseTimeInterval = 600; fopt.leaseOpt.mdsRefreshTimesPerLease = 4; fileinstance_ = new FileInstance(); @@ -102,13 +106,21 @@ class IOTrackerSplitorTest : public ::testing::Test { userinfo.password = "12345"; mdsclient_.Initialize(fopt.metaServerOpt); - fileinstance_->Initialize("/test", &mdsclient_, userinfo, fopt); + fileinstance_->Initialize("/1_userinfo_.txt", + &mdsclient_, userinfo, fopt); + + SourceReader::GetInstance().Init(configpath); + fileClient_ = new FileClient(); + fileClient_->SetMdsClient(&mdsclient_); + ClientConfig clientConfig; + clientConfig.SetFileServiceOption(fopt); + fileClient_->SetClientConfig(clientConfig); + SourceReader::GetInstance().SetFileClient(fileClient_); InsertMetaCache(); } void TearDown() { writeData.clear(); - fileinstance_->UnInitialize(); mdsclient_.UnInitialize(); delete fileinstance_; @@ -145,6 +157,7 @@ class IOTrackerSplitorTest : public ::testing::Test { ::curve::mds::FileInfo* fin = new ::curve::mds::FileInfo; fin->set_filename("1_userinfo_.txt"); + fin->set_owner("userinfo"); fin->set_id(1); fin->set_parentid(0); fin->set_filetype(curve::mds::FileType::INODE_PAGEFILE); @@ -163,7 +176,16 @@ class IOTrackerSplitorTest : public ::testing::Test { fileinstance_->Open("1_userinfo_.txt", userinfo); /** - * 2. 设置GetOrAllocateSegmentresponse + * 2. set closefile response + */ + ::curve::mds::CloseFileResponse* closeresp = new ::curve::mds::CloseFileResponse; // NOLINT + closeresp->set_statuscode(::curve::mds::StatusCode::kOK); + FakeReturn* closefileret + = new FakeReturn(nullptr, static_cast(closeresp)); + curvefsservice.SetCloseFile(closefileret); + + /** + * 3. 设置GetOrAllocateSegmentresponse */ curve::mds::GetOrAllocateSegmentResponse* response = new curve::mds::GetOrAllocateSegmentResponse(); @@ -185,11 +207,45 @@ class IOTrackerSplitorTest : public ::testing::Test { chunk->set_copysetid(i); chunk->set_chunkid(i); } - FakeReturn* fakeret = new FakeReturn(nullptr, + getsegmentfakeret = new FakeReturn(nullptr, static_cast(response)); - curvefsservice.SetGetOrAllocateSegmentFakeReturn(fakeret); + curvefsservice.SetGetOrAllocateSegmentFakeReturn(getsegmentfakeret); + + curve::mds::GetOrAllocateSegmentResponse* notallocateresponse = + new curve::mds::GetOrAllocateSegmentResponse(); + notallocateresponse->set_statuscode(::curve::mds::StatusCode + ::kSegmentNotAllocated); + notallocatefakeret = new FakeReturn(nullptr, + static_cast(notallocateresponse)); + + // set GetOrAllocateSegmentResponse for read from clone source + curve::mds::GetOrAllocateSegmentResponse* cloneSourceResponse = + new curve::mds::GetOrAllocateSegmentResponse(); + curve::mds::PageFileSegment* clonepfs = new curve::mds::PageFileSegment; + + cloneSourceResponse->set_statuscode(::curve::mds::StatusCode::kOK); + cloneSourceResponse->set_allocated_pagefilesegment(clonepfs); + cloneSourceResponse->mutable_pagefilesegment()-> + set_logicalpoolid(1); + cloneSourceResponse->mutable_pagefilesegment()-> + set_segmentsize(1 * 1024 * 1024 * 1024); + cloneSourceResponse->mutable_pagefilesegment()-> + set_chunksize(4 * 1024 * 1024); + cloneSourceResponse->mutable_pagefilesegment()-> + set_startoffset(1 * 1024 * 1024 * 1024); + + for (int i = 256; i < 512; i++) { + auto chunk = cloneSourceResponse->mutable_pagefilesegment() + ->add_chunks(); + chunk->set_copysetid(i); + chunk->set_chunkid(i); + } + getsegmentfakeretclone = new FakeReturn(nullptr, + static_cast(cloneSourceResponse)); - // 3. set refresh response + /** + * 4. set refresh response + */ curve::mds::FileInfo * info = new curve::mds::FileInfo; info->set_filename("1_userinfo_.txt"); info->set_seqnum(2); @@ -197,7 +253,7 @@ class IOTrackerSplitorTest : public ::testing::Test { info->set_parentid(0); info->set_filetype(curve::mds::FileType::INODE_PAGEFILE); info->set_chunksize(4 * 1024 * 1024); - info->set_length(4 * 1024 * 1024 * 1024ul); + info->set_length(1 * 1024 * 1024 * 1024ul); info->set_ctime(12345678); ::curve::mds::ReFreshSessionResponse* refreshresp = @@ -210,7 +266,7 @@ class IOTrackerSplitorTest : public ::testing::Test { curvefsservice.SetRefreshSession(refreshfakeret, nullptr); /** - * 4. 设置topology返回值 + * 5. 设置topology返回值 */ ::curve::mds::topology::GetChunkServerListInCopySetsResponse* response_1 = new ::curve::mds::topology::GetChunkServerListInCopySetsResponse; @@ -250,21 +306,24 @@ class IOTrackerSplitorTest : public ::testing::Test { std::vector cpinfoVec; mdsclient_.GetServerList(lpcsIDInfo.lpid, - lpcsIDInfo.cpidVec, &cpinfoVec); + lpcsIDInfo.cpidVec, &cpinfoVec); for (auto iter : cpinfoVec) { mc->UpdateCopysetInfo(lpcsIDInfo.lpid, iter.cpid_, iter); } } + FileClient *fileClient_; UserInfo_t userinfo; MDSClient mdsclient_; FileServiceOption fopt; - curve::client::ClientConfig cc; - FileInstance* fileinstance_; + FileInstance *fileinstance_; brpc::Server server; FakeMDSCurveFSService curvefsservice; FakeTopologyService topologyservice; + FakeReturn *getsegmentfakeret; + FakeReturn *notallocatefakeret; + FakeReturn *getsegmentfakeretclone; }; TEST_F(IOTrackerSplitorTest, AsyncStartRead) { @@ -948,5 +1007,286 @@ TEST(SplitorTest, RequestSourceInfoTest) { ASSERT_EQ(sourceInfo.cloneFileOffset, 0); } +// read the chunks all haven't been write from normal volume with no clonesource +TEST_F(IOTrackerSplitorTest, StartReadNotAllocateSegment) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + iomana->SetRequestScheduler(mockschuler); + + uint64_t offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + uint64_t length = 4 * 1024 * 1024 + 8 * 1024; + char* data = new char[length]; + + auto threadfunc = [&]() { + iomana->Read(data, offset, length, &mdsclient_); + }; + std::thread process(threadfunc); + + if (process.joinable()) { + process.join(); + } + + for (int i = 0; i < length; i++) { + ASSERT_EQ(0, data[i]); + } + delete[] data; +} + +TEST_F(IOTrackerSplitorTest, AsyncStartReadNotAllocateSegment) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + iomana->SetRequestScheduler(mockschuler); + + CurveAioContext aioctx; + aioctx.offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + aioctx.length = 4 * 1024 * 1024 + 8 * 1024; + aioctx.ret = LIBCURVE_ERROR::OK; + aioctx.cb = readcallback; + aioctx.buf = new char[aioctx.length]; + aioctx.op = LIBCURVE_OP::LIBCURVE_OP_READ; + + ioreadflag = false; + char* data = static_cast(aioctx.buf); + iomana->AioRead(&aioctx, &mdsclient_, UserDataType::RawBuffer); + + { + std::unique_lock lk(readmtx); + readcv.wait(lk, []()->bool{return ioreadflag;}); + } + + for (int i = 0; i < aioctx.length; i++) { + ASSERT_EQ(0, data[i]); + } + delete[] data; +} + +// read the chunks some of them haven't been writtern from normal volume +// with no clonesource +TEST_F(IOTrackerSplitorTest, StartReadNotAllocateSegment2) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + ChunkIDInfo chunkIdInfo(1, 1, 256); + mc->UpdateChunkInfoByIndex(256, chunkIdInfo); + iomana->SetRequestScheduler(mockschuler); + + uint64_t offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + uint64_t length = 4 * 1024 * 1024 + 8 * 1024; + char* data = new char[length]; + + auto threadfunc = [&]() { + iomana->Read(data, offset, length, &mdsclient_); + }; + std::thread process(threadfunc); + + if (process.joinable()) { + process.join(); + } + + for (int i = 0; i < 4 * 1024; i++) { + ASSERT_EQ('a', data[i]); + } + + for (int i = 4 * 1024; i < length; i++) { + ASSERT_EQ(0, data[i]); + } + delete[] data; +} + +TEST_F(IOTrackerSplitorTest, AsyncStartReadNotAllocateSegment2) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + ChunkIDInfo chunkIdInfo(1, 1, 256); + mc->UpdateChunkInfoByIndex(256, chunkIdInfo); + iomana->SetRequestScheduler(mockschuler); + + CurveAioContext aioctx; + aioctx.offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + aioctx.length = 4 * 1024 * 1024 + 8 * 1024; + aioctx.ret = LIBCURVE_ERROR::OK; + aioctx.cb = readcallback; + aioctx.buf = new char[aioctx.length]; + aioctx.op = LIBCURVE_OP::LIBCURVE_OP_READ; + + ioreadflag = false; + char* data = static_cast(aioctx.buf); + iomana->AioRead(&aioctx, &mdsclient_, UserDataType::RawBuffer); + + { + std::unique_lock lk(readmtx); + readcv.wait(lk, []()->bool{return ioreadflag;}); + } + + for (int i = 0; i < 4 * 1024; i++) { + ASSERT_EQ('a', data[i]); + } + + for (int i = 4 * 1024; i < aioctx.length; i++) { + ASSERT_EQ(0, data[i]); + } + delete[] data; +} + +// read the chunks some haven't been write from clone volume with clonesource +TEST_F(IOTrackerSplitorTest, StartReadNotAllocateSegmentFromOrigin) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + curvefsservice.SetGetOrAllocateSegmentFakeReturnForClone + (getsegmentfakeretclone); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + FileInstance* fileinstance2 = new FileInstance(); + userinfo.owner = "cloneuser"; + userinfo.password = "12345"; + mdsclient_.Initialize(fopt.metaServerOpt); + fileinstance2->Initialize("/clonesource", &mdsclient_, userinfo, fopt); + std::unordered_map>& fdmap = + SourceReader::GetInstance().GetFdMap(); + fdmap["/clonesource"] = std::make_pair(1234, time(0)); + std::unordered_map& fileservicemap = + SourceReader::GetInstance(). + GetFileClient()->GetFileServiceMap(); + fileservicemap[1234] = fileinstance2; + fileinstance2->GetIOManager4File()->SetRequestScheduler(mockschuler); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + ChunkIDInfo chunkIdInfo(1, 1, 257); + mc->UpdateChunkInfoByIndex(257, chunkIdInfo); + + FInfo_t fileInfo; + fileInfo.chunksize = 4 * 1024 * 1024; // 4M + fileInfo.cloneLength = 10ull * 1024 * 1024 * 1024; // 10G + fileInfo.fullPathName = "/1_userinfo_.txt"; + fileInfo.owner = "userinfo"; + fileInfo.cloneSource = "/clonesource"; + fileInfo.userinfo = userinfo; + mc->UpdateFileInfo(fileInfo); + + iomana->SetRequestScheduler(mockschuler); + + uint64_t offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + uint64_t length = 4 * 1024 * 1024 + 8 * 1024; + char* data = new char[length]; + + auto threadfunc = [&]() { + iomana->Read(data, offset, length, &mdsclient_); + }; + std::thread process(threadfunc); + + if (process.joinable()) { + process.join(); + } + + LOG(ERROR) << "address = " << &data; + ASSERT_EQ('a', data[0]); + ASSERT_EQ('a', data[4 * 1024 - 1]); + ASSERT_EQ('a', data[4 * 1024]); + ASSERT_EQ('d', data[4 * 1024 + chunk_size - 1]); + ASSERT_EQ('a', data[4 * 1024 + chunk_size]); + ASSERT_EQ('a', data[length - 1]); + delete[] data; +} + +TEST_F(IOTrackerSplitorTest, AsyncStartReadNotAllocateSegmentFromOrigin) { + curvefsservice.SetGetOrAllocateSegmentFakeReturn(notallocatefakeret); + curvefsservice.SetGetOrAllocateSegmentFakeReturnForClone + (getsegmentfakeretclone); + MockRequestScheduler* mockschuler = new MockRequestScheduler; + mockschuler->DelegateToFake(); + + FileInstance* fileinstance2 = new FileInstance(); + userinfo.owner = "cloneuser"; + userinfo.password = "12345"; + mdsclient_.Initialize(fopt.metaServerOpt); + fileinstance2->Initialize("/clonesource", &mdsclient_, userinfo, fopt); + std::unordered_map>& fdmap = + SourceReader::GetInstance().GetFdMap(); + fdmap["/clonesource"] = std::make_pair(1234, time(0)); + std::unordered_map& fileservicemap = + SourceReader::GetInstance(). + GetFileClient()->GetFileServiceMap(); + fileservicemap[1234] = fileinstance2; + fileinstance2->GetIOManager4File()->SetRequestScheduler(mockschuler); + + curve::client::IOManager4File* iomana = fileinstance_->GetIOManager4File(); + MetaCache* mc = fileinstance_->GetIOManager4File()->GetMetaCache(); + ChunkIDInfo chunkIdInfo(1, 1, 257); + mc->UpdateChunkInfoByIndex(257, chunkIdInfo); + + FInfo_t fileInfo; + fileInfo.chunksize = 4 * 1024 * 1024; + fileInfo.cloneLength = 10ull * 1024 * 1024 * 1024; + fileInfo.filename = "1_userinfo_.txt"; + fileInfo.owner = "userinfo"; + fileInfo.cloneSource = "/clonesource"; + fileInfo.userinfo = userinfo; + mc->UpdateFileInfo(fileInfo); + + iomana->SetRequestScheduler(mockschuler); + + CurveAioContext aioctx; + aioctx.offset = 1 * 1024 * 1024 * 1024 + 4 * 1024 * 1024 - 4 * 1024; + aioctx.length = 4 * 1024 * 1024 + 8 * 1024; + aioctx.ret = LIBCURVE_ERROR::OK; + aioctx.cb = readcallback; + aioctx.buf = new char[aioctx.length]; + aioctx.op = LIBCURVE_OP::LIBCURVE_OP_READ; + + ioreadflag = false; + char* data = static_cast(aioctx.buf); + iomana->AioRead(&aioctx, &mdsclient_, UserDataType::RawBuffer); + + { + std::unique_lock lk(readmtx); + readcv.wait(lk, []()->bool{return ioreadflag;}); + } + LOG(ERROR) << "address = " << &data; + ASSERT_EQ('a', data[0]); + ASSERT_EQ('a', data[4 * 1024 - 1]); + ASSERT_EQ('a', data[4 * 1024]); + ASSERT_EQ('d', data[4 * 1024 + chunk_size - 1]); + ASSERT_EQ('a', data[4 * 1024 + chunk_size]); + ASSERT_EQ('a', data[aioctx.length - 1]); + delete[] data; +} + +TEST_F(IOTrackerSplitorTest, TimedCloseFd) { + FileInstance* fileinstance2 = new FileInstance(); + MDSClient mdsclient_; + userinfo.owner = "userinfo"; + userinfo.password = "12345"; + mdsclient_.Initialize(fopt.metaServerOpt); + fileinstance2->Initialize("/test", &mdsclient_, userinfo, fopt); + std::unordered_map>& fdmap = + SourceReader::GetInstance().GetFdMap(); + fdmap.clear(); + fdmap["/test"] = std::make_pair(1234, time(0) - + fopt.ioOpt.closeFdThreadOption.fdTimeout - 5); + std::unordered_map& fileservicemap = + SourceReader::GetInstance(). + GetFileClient()->GetFileServiceMap(); + fileservicemap[1234] = fileinstance2; + SourceReader::GetInstance().Run(); + ::sleep(2); + SourceReader::GetInstance().Stop(); + ASSERT_EQ(0, fdmap.size()); +} + } // namespace client } // namespace curve diff --git a/test/integration/snapshotcloneserver/snapshotcloneserver_common_test.cpp b/test/integration/snapshotcloneserver/snapshotcloneserver_common_test.cpp index 7914945f58..3f3889b7ba 100644 --- a/test/integration/snapshotcloneserver/snapshotcloneserver_common_test.cpp +++ b/test/integration/snapshotcloneserver/snapshotcloneserver_common_test.cpp @@ -32,10 +32,12 @@ #include "test/integration/snapshotcloneserver/test_snapshotcloneserver_helpler.h" #include "src/common/snapshotclone/snapshotclone_define.h" #include "src/snapshotcloneserver/common/snapshotclone_meta_store.h" +#include "src/client/source_reader.h" using curve::CurveCluster; using curve::client::FileClient; using curve::client::UserInfo_t; +using curve::client::SourceReader; const std::string kTestPrefix = "SCSTest"; // NOLINT @@ -338,6 +340,7 @@ class SnapshotCloneServerTest : public ::testing::Test { fileClient_ = new FileClient(); fileClient_->Init(kClientConfigPath); + SourceReader::GetInstance().Init(kClientConfigPath); UserInfo_t userinfo; userinfo.owner = "ItUser1"; diff --git a/test/integration/snapshotcloneserver/snapshotcloneserver_concurrent_test.cpp b/test/integration/snapshotcloneserver/snapshotcloneserver_concurrent_test.cpp index cc0a30808f..efb6950a32 100644 --- a/test/integration/snapshotcloneserver/snapshotcloneserver_concurrent_test.cpp +++ b/test/integration/snapshotcloneserver/snapshotcloneserver_concurrent_test.cpp @@ -32,10 +32,12 @@ #include "test/integration/snapshotcloneserver/test_snapshotcloneserver_helpler.h" #include "src/common/snapshotclone/snapshotclone_define.h" #include "src/snapshotcloneserver/common/snapshotclone_meta_store.h" +#include "src/client/source_reader.h" using curve::CurveCluster; using curve::client::FileClient; using curve::client::UserInfo_t; +using curve::client::SourceReader; const std::string kTestPrefix = "ConSCSTest"; // NOLINT @@ -334,6 +336,7 @@ class SnapshotCloneServerTest : public ::testing::Test { fileClient_ = new FileClient(); fileClient_->Init(kClientConfigPath); + SourceReader::GetInstance().Init(kClientConfigPath); UserInfo_t userinfo; userinfo.owner = "concurrentItUser1";