Skip to content

Commit

Permalink
client read not allocate segment
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Dec 24, 2020
1 parent b62adde commit 70d88df
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 29 deletions.
8 changes: 8 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ global.logPath=/data/log/curve/
# 单元测试情况下
# logpath=./runlog/

#
################# 读源卷相关配置 ###############
#
# 读取源卷时打开的fd超时关闭时间300s
closefd.timeout=300
# 读取源卷时打开的fd后台线程每600s扫描一遍fdMap,关闭超时fd
closefd.timeInterval=600

#
############### metric 配置信息 #############
#
Expand Down
2 changes: 2 additions & 0 deletions src/client/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ typedef struct ChunkIDInfo {
CopysetID cpid_ = 0;
LogicPoolID lpid_ = 0;

bool chunkExist = true;

ChunkIDInfo() = default;

ChunkIDInfo(ChunkID cid, LogicPoolID lpid, CopysetID cpid)
Expand Down
7 changes: 7 additions & 0 deletions src/client/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class ClientConfig {
return fileServiceOption_;
}

/**
* test use, set the fileServiceOption_
*/
void SetFileServiceOption(FileServiceOption opt) {
fileServiceOption_ = opt;
}

uint16_t GetDummyserverStartPort();

private:
Expand Down
11 changes: 11 additions & 0 deletions src/client/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ struct TaskThreadOption {
uint32_t isolationTaskThreadPoolSize = 1;
};

/**
* timed close fd thread in SourceReader config
* @fdTimeout: sourcereader fd timeout
* @fdCloseTimeInterval: close sourcereader fd time interval
*/
struct CloseFdThreadOption {
uint32_t fdTimeout = 300;
uint32_t fdCloseTimeInterval = 600;
};

/**
* IOOption存储了当前io 操作所需要的所有配置信息
*/
Expand All @@ -222,6 +232,7 @@ struct IOOption {
MetaCacheOption metaCacheOpt;
TaskThreadOption taskThreadOpt;
RequestScheduleOption reqSchdulerOpt;
CloseFdThreadOption closeFdThreadOption;
};

/**
Expand Down
31 changes: 29 additions & 2 deletions src/client/io_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "src/client/request_scheduler.h"
#include "src/client/request_closure.h"
#include "src/common/timeutility.h"
#include "src/client/libcurve_file.h"
#include "src/client/source_reader.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -91,15 +93,34 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
if (ret == 0) {
PrepareReadIOBuffers(reqlist_.size());
uint32_t subIoIndex = 0;
std::vector<RequestContext*> originReadVec;

reqcount_.store(reqlist_.size(), std::memory_order_release);
std::for_each(reqlist_.begin(), reqlist_.end(), [&](RequestContext* r) {
// fake subrequest
if (!r->idinfo_.chunkExist) {
// the clone source is empty
if (r->sourceInfo_.cloneFileSource.empty()) {
// add zero data
r->readData_.resize(r->rawlength_, 0);
r->done_->SetFailed(LIBCURVE_ERROR::OK);
} else {
// read from original volume
originReadVec.emplace_back(r);
}
}

r->done_->SetFileMetric(fileMetric_);
r->done_->SetIOManager(iomanager_);
r->subIoIndex_ = subIoIndex++;
});

ret = scheduler_->ScheduleRequest(reqlist_);
reqcount_.store(reqlist_.size(), std::memory_order_release);
if (scheduler_->ScheduleRequest(reqlist_) == 0 &&
ReadFromSource(originReadVec, fileInfo->userinfo) == 0) {
ret = 0;
} else {
ret = -1;
}
} else {
LOG(ERROR) << "splitor read io failed, "
<< "offset = " << offset_ << ", length = " << length_;
Expand All @@ -111,6 +132,12 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
}
}

int IOTracker::ReadFromSource(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo) {
SourceReader &sourceReader = SourceReader::GetInstance();
return sourceReader.Read(reqCtxVec, userInfo);
}

void IOTracker::StartWrite(const void* buf, off_t offset, size_t length,
MDSClient* mdsclient, const FInfo_t* fileInfo) {
data_ = const_cast<void*>(buf);
Expand Down
9 changes: 9 additions & 0 deletions src/client/io_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker {
// perform read operation
void DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo);

/**
* read from the origin
* @param: reqCtxVec the read request context vector
* @param: the user info
* @return 0 success; -1 fail
*/
int ReadFromSource(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo);

// perform write operation
void DoWrite(MDSClient* mdsclient, const FInfo_t* fileInfo);

Expand Down
11 changes: 9 additions & 2 deletions src/client/libcurve_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "include/client/libcurve.h"
#include "src/client/libcurve_file.h"
#include "src/client/source_reader.h"

namespace curve {
namespace client {
Expand All @@ -34,7 +35,13 @@ CurveClient::~CurveClient() {
}

int CurveClient::Init(const std::string& configPath) {
return fileClient_->Init(configPath);
if(0 == fileClient_->Init(configPath) &&
0 == SourceReader::GetInstance().Init(configPath)) {
SourceReader::GetInstance().Run();
return LIBCURVE_ERROR::OK;
} else {
return -LIBCURVE_ERROR::FAILED;
}
}

void CurveClient::UnInit() {
Expand Down Expand Up @@ -124,4 +131,4 @@ void CurveClient::SetFileClient(FileClient* client) {
}

} // namespace client
} // namespace curve
} // namespace curve
25 changes: 25 additions & 0 deletions src/client/libcurve_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,31 @@ class FileClient {
return openedFileNum_.get_value();
}

/**
* test use, set the mdsclient_
*/
void SetMdsClient(MDSClient* client) {
mdsClient_ = client;
}

/**
* test use, set the clientconfig_
*/
void SetClientConfig(ClientConfig cfg) {
clientconfig_ = cfg;
}

const ClientConfig& GetClientConfig() {
return clientconfig_;
}

/**
* test use, get the fileserviceMap_
*/
std::unordered_map<int, FileInstance*>& GetFileServiceMap() {
return fileserviceMap_;
}

private:
bool StartDummyServer();

Expand Down
8 changes: 8 additions & 0 deletions src/client/request_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ int RequestScheduler::ScheduleRequest(
if (running_.load(std::memory_order_acquire)) {
/* TODO(wudemiao): 后期考虑 qos */
for (auto it : requests) {
// skip the fake request
if (!it->idinfo_.chunkExist) {
if (it->sourceInfo_.cloneFileSource.empty()) {
it->done_->Run();
}
continue;
}

BBQItem<RequestContext *> req(it);
queue_.PutBack(req);
}
Expand Down
182 changes: 182 additions & 0 deletions src/client/source_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (c) 2020 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Project: curve
* File Created: 2020-12-23
* Author: wanghai01
*/

#include "src/client/source_reader.h"

namespace curve {
namespace client {

struct CurveAioCombineContext {
RequestClosure* done;
CurveAioContext curveCtx;
};

void CurveAioCallback(struct CurveAioContext* context) {
auto curveCombineCtx = reinterpret_cast<CurveAioCombineContext *>(
reinterpret_cast<char *>(context) -
offsetof(CurveAioCombineContext, curveCtx));
RequestClosure* done = curveCombineCtx->done;
if (context->ret < 0) {
done->SetFailed(LIBCURVE_ERROR::FAILED);
} else {
done->SetFailed(LIBCURVE_ERROR::OK);
}

delete curveCombineCtx;
brpc::ClosureGuard doneGuard(done);
}

SourceReader::SourceReader() : fileClient_(new FileClient()) {}

SourceReader::~SourceReader() {
Stop();
Uinit();
}

void SourceReader::Run() {
if (!running_) {
running_ = true;
fdCloseThread_ = std::thread(&SourceReader::Closefd, this);
LOG(INFO) << "SourceReader fdCloseThread run successfully";
} else {
LOG(WARNING) << "SourceReader fdCloseThread is running!";
}
}

void SourceReader::Stop() {
if (running_) {
running_ = false;
sleeper_.interrupt();
fdCloseThread_.join();
LOG(INFO) << "SourceReader fdCloseThread stoped successfully";
}
}

int SourceReader::Init(const std::string& configPath) {
if (inited_) {
LOG(WARNING) << "SourceReader already inited!";
return 0;
}

if (LIBCURVE_ERROR::OK == fileClient_->Init(configPath)) {
inited_ = true;
return LIBCURVE_ERROR::OK;
}
return -LIBCURVE_ERROR::FAILED;
}

void SourceReader::Uinit() {
if (!inited_) {
LOG(WARNING) << "SourceReader not inited!";
return;
}
if (fileClient_ != nullptr) {
for (auto &pair : fdMap_) {
fileClient_->Close(pair.second.first);
}
fdMap_.clear();
fileClient_->UnInit();
delete fileClient_;
fileClient_ = nullptr;
inited_ = false;
}
}

SourceReader& SourceReader::GetInstance() {
static SourceReader originReader;
return originReader;
}

int SourceReader::Read(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo) {
if (reqCtxVec.size() <= 0) {
return 0;
}

for (auto reqCtx : reqCtxVec) {
brpc::ClosureGuard doneGuard(reqCtx->done_);
std::string fileName = reqCtx->sourceInfo_.cloneFileSource;
int fd = 0;
{
curve::common::WriteLockGuard writeLockGuard(rwLock_);
auto iter = fdMap_.find(fileName);
if (iter != fdMap_.end()) {
fd = iter->second.first;
} else {
fd = fileClient_->Open4ReadOnly(fileName, userInfo);
if (fd < 0) {
LOG(ERROR) << "Open curve file failed."
<< "file name: " << fileName
<< " ,return code: " << fd;
return -1;
}
}
fdMap_[fileName] = std::make_pair(fd, time(0));
}
CurveAioCombineContext *curveCombineCtx = new CurveAioCombineContext();
curveCombineCtx->done = reqCtx->done_;
curveCombineCtx->curveCtx.offset = reqCtx->sourceInfo_.cloneFileOffset
+ reqCtx->offset_;
curveCombineCtx->curveCtx.length = reqCtx->rawlength_;
curveCombineCtx->curveCtx.buf = &reqCtx->readData_;
curveCombineCtx->curveCtx.op = LIBCURVE_OP::LIBCURVE_OP_READ;
curveCombineCtx->curveCtx.cb = CurveAioCallback;

int ret = fileClient_->AioRead(fd, &curveCombineCtx->curveCtx,
UserDataType::IOBuffer);
if (ret != LIBCURVE_ERROR::OK) {
LOG(ERROR) << "Read curve file failed."
<< "file name: " << fileName
<< " ,error code: " << ret;
delete curveCombineCtx;
return -1;
} else {
doneGuard.release();
}
}
return 0;
}

int SourceReader::Closefd() {
while (running_) {
{
curve::common::WriteLockGuard writeLockGuard(rwLock_);
for (auto iter = fdMap_.begin(); iter != fdMap_.end();) {
int fd = iter->second.first;
time_t timestamp = iter->second.second;
if (time(0) - timestamp > fileClient_->GetClientConfig().GetFileServiceOption().
ioOpt.closeFdThreadOption.fdTimeout) {
fileClient_->Close(fd);
iter = fdMap_.erase(iter);
} else {
iter++;
}
}
}

sleeper_.wait_for(std::chrono::seconds(fileClient_->GetClientConfig().
GetFileServiceOption().ioOpt.closeFdThreadOption.fdCloseTimeInterval));
}
}

} // namespace client
} // namespace curve
Loading

0 comments on commit 70d88df

Please sign in to comment.