Skip to content

Commit

Permalink
client read not allocate segment
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai authored and ilixiaocui committed Dec 25, 2020
1 parent 0011847 commit fcd778a
Show file tree
Hide file tree
Showing 18 changed files with 816 additions and 30 deletions.
8 changes: 8 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ global.logPath=/data/log/curve/
# 单元测试情况下
# logpath=./runlog/

#
################# 读源卷相关配置 ###############
#
# 读取源卷时打开的fd超时关闭时间300s
closefd.timeout=300
# 读取源卷时打开的fd后台线程每600s扫描一遍fdMap,关闭超时fd
closefd.timeInterval=600

#
############### metric 配置信息 #############
#
Expand Down
2 changes: 2 additions & 0 deletions src/client/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ typedef struct ChunkIDInfo {
CopysetID cpid_ = 0;
LogicPoolID lpid_ = 0;

bool chunkExist = true;

ChunkIDInfo() = default;

ChunkIDInfo(ChunkID cid, LogicPoolID lpid, CopysetID cpid)
Expand Down
7 changes: 7 additions & 0 deletions src/client/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class ClientConfig {
return fileServiceOption_;
}

/**
* test use, set the fileServiceOption_
*/
void SetFileServiceOption(FileServiceOption opt) {
fileServiceOption_ = opt;
}

uint16_t GetDummyserverStartPort();

private:
Expand Down
11 changes: 11 additions & 0 deletions src/client/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ struct TaskThreadOption {
uint32_t isolationTaskThreadPoolSize = 1;
};

/**
* timed close fd thread in SourceReader config
* @fdTimeout: sourcereader fd timeout
* @fdCloseTimeInterval: close sourcereader fd time interval
*/
struct CloseFdThreadOption {
uint32_t fdTimeout = 300;
uint32_t fdCloseTimeInterval = 600;
};

/**
* IOOption存储了当前io 操作所需要的所有配置信息
*/
Expand All @@ -222,6 +232,7 @@ struct IOOption {
MetaCacheOption metaCacheOpt;
TaskThreadOption taskThreadOpt;
RequestScheduleOption reqSchdulerOpt;
CloseFdThreadOption closeFdThreadOption;
};

/**
Expand Down
30 changes: 28 additions & 2 deletions src/client/io_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "src/client/request_scheduler.h"
#include "src/client/request_closure.h"
#include "src/common/timeutility.h"
#include "src/client/libcurve_file.h"
#include "src/client/source_reader.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -91,15 +93,34 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
if (ret == 0) {
PrepareReadIOBuffers(reqlist_.size());
uint32_t subIoIndex = 0;
std::vector<RequestContext*> originReadVec;

reqcount_.store(reqlist_.size(), std::memory_order_release);
std::for_each(reqlist_.begin(), reqlist_.end(), [&](RequestContext* r) {
// fake subrequest
if (!r->idinfo_.chunkExist) {
// the clone source is empty
if (r->sourceInfo_.cloneFileSource.empty()) {
// add zero data
r->readData_.resize(r->rawlength_, 0);
r->done_->SetFailed(LIBCURVE_ERROR::OK);
} else {
// read from original volume
originReadVec.emplace_back(r);
}
}

r->done_->SetFileMetric(fileMetric_);
r->done_->SetIOManager(iomanager_);
r->subIoIndex_ = subIoIndex++;
});

ret = scheduler_->ScheduleRequest(reqlist_);
reqcount_.store(reqlist_.size(), std::memory_order_release);
if (scheduler_->ScheduleRequest(reqlist_) == 0 &&
ReadFromSource(originReadVec, fileInfo->userinfo) == 0) {
ret = 0;
} else {
ret = -1;
}
} else {
LOG(ERROR) << "splitor read io failed, "
<< "offset = " << offset_ << ", length = " << length_;
Expand All @@ -111,6 +132,11 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
}
}

int IOTracker::ReadFromSource(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo) {
return SourceReader::GetInstance().Read(reqCtxVec, userInfo);
}

void IOTracker::StartWrite(const void* buf, off_t offset, size_t length,
MDSClient* mdsclient, const FInfo_t* fileInfo) {
data_ = const_cast<void*>(buf);
Expand Down
9 changes: 9 additions & 0 deletions src/client/io_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker {
// perform read operation
void DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo);

/**
* read from the source
* @param: reqCtxVec the read request context vector
* @param: the user info
* @return 0 success; -1 fail
*/
int ReadFromSource(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo);

// perform write operation
void DoWrite(MDSClient* mdsclient, const FInfo_t* fileInfo);

Expand Down
10 changes: 9 additions & 1 deletion src/client/libcurve_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "include/client/libcurve.h"
#include "src/client/libcurve_file.h"
#include "src/client/source_reader.h"

namespace curve {
namespace client {
Expand All @@ -34,7 +35,14 @@ CurveClient::~CurveClient() {
}

int CurveClient::Init(const std::string& configPath) {
return fileClient_->Init(configPath);
if (0 == fileClient_->Init(configPath) &&
0 == SourceReader::GetInstance().Init(configPath)) {
SourceReader::GetInstance().Run();
return LIBCURVE_ERROR::OK;
} else {
LOG(ERROR) << "Curve Client Or SourceReader Init failed!";
return -LIBCURVE_ERROR::FAILED;
}
}

void CurveClient::UnInit() {
Expand Down
25 changes: 25 additions & 0 deletions src/client/libcurve_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,31 @@ class FileClient {
return openedFileNum_.get_value();
}

/**
* test use, set the mdsclient_
*/
void SetMdsClient(MDSClient* client) {
mdsClient_ = client;
}

/**
* test use, set the clientconfig_
*/
void SetClientConfig(ClientConfig cfg) {
clientconfig_ = cfg;
}

const ClientConfig& GetClientConfig() {
return clientconfig_;
}

/**
* test use, get the fileserviceMap_
*/
std::unordered_map<int, FileInstance*>& GetFileServiceMap() {
return fileserviceMap_;
}

private:
bool StartDummyServer();

Expand Down
8 changes: 8 additions & 0 deletions src/client/request_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ int RequestScheduler::ScheduleRequest(
if (running_.load(std::memory_order_acquire)) {
/* TODO(wudemiao): 后期考虑 qos */
for (auto it : requests) {
// skip the fake request
if (!it->idinfo_.chunkExist) {
if (it->sourceInfo_.cloneFileSource.empty()) {
it->done_->Run();
}
continue;
}

BBQItem<RequestContext *> req(it);
queue_.PutBack(req);
}
Expand Down
Loading

0 comments on commit fcd778a

Please sign in to comment.