Skip to content

Commit

Permalink
client read not allocate segment
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Dec 10, 2020
1 parent 9966376 commit 08fa2f1
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 49 deletions.
2 changes: 2 additions & 0 deletions src/client/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ typedef struct ChunkIDInfo {
CopysetID cpid_ = 0;
LogicPoolID lpid_ = 0;

bool chunkExist = true;

ChunkIDInfo() = default;

ChunkIDInfo(ChunkID cid, LogicPoolID lpid, CopysetID cpid)
Expand Down
7 changes: 7 additions & 0 deletions src/client/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class ClientConfig {
return fileServiceOption_;
}

/**
* test use, set the fileServiceOption_
*/
void SetFileServiceOption(FileServiceOption opt) {
fileServiceOption_ = opt;
}

uint16_t GetDummyserverStartPort();

private:
Expand Down
110 changes: 106 additions & 4 deletions src/client/io_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "src/client/request_scheduler.h"
#include "src/client/request_closure.h"
#include "src/common/timeutility.h"
#include "src/client/libcurve_file.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -59,6 +60,24 @@ IOTracker::IOTracker(IOManager* iomanager,
opStartTimePoint_ = curve::common::TimeUtility::GetTimeofDayUs();
}

struct CurveAioCombineContext {
RequestClosure* done;
CurveAioContext curveCtx;
};

void CurveAioCallback(struct CurveAioContext* context) {
auto curveCombineCtx = reinterpret_cast<CurveAioCombineContext *>(
reinterpret_cast<char *>(context) -
offsetof(CurveAioCombineContext, curveCtx));
RequestClosure* done = curveCombineCtx->done;
if (context->ret < 0) {
done->SetFailed(LIBCURVE_ERROR::FAILED);
}
delete curveCombineCtx;
done->SetFailed(LIBCURVE_ERROR::OK);
brpc::ClosureGuard doneGuard(done);
}

void IOTracker::StartRead(void* buf, off_t offset, size_t length,
MDSClient* mdsclient, const FInfo_t* fileInfo) {
data_ = buf;
Expand Down Expand Up @@ -91,26 +110,109 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
if (ret == 0) {
PrepareReadIOBuffers(reqlist_.size());
uint32_t subIoIndex = 0;
uint32_t fakeReqNum = 0;
std::vector<RequestContext*> originReadVec;

for (auto r : reqlist_) {
// fake subrequest
if (!r->idinfo_.chunkExist) {
// the clone source is empty
if (r->sourceInfo_.cloneFileSource.empty()) {
// add zero data
butil::IOBuf zeroDataBuf;
zeroDataBuf.resize(r->rawlength_, 0);
r->readData_ = zeroDataBuf;
r->done_->SetFailed(LIBCURVE_ERROR::OK);
fakeReqNum++;
} else {
// read from original volume
originReadVec.push_back(r);
}

r->subIoIndex_ = subIoIndex++;
continue;
}

reqcount_.store(reqlist_.size(), std::memory_order_release);
std::for_each(reqlist_.begin(), reqlist_.end(), [&](RequestContext* r) {
r->done_->SetFileMetric(fileMetric_);
r->done_->SetIOManager(iomanager_);
r->subIoIndex_ = subIoIndex++;
});
}

reqcount_.store(reqlist_.size(), std::memory_order_release);
ret = scheduler_->ScheduleRequest(reqlist_);
ret += ReadFromOrigin(originReadVec, fileInfo->userinfo);
} else {
LOG(ERROR) << "splitor read io failed, "
<< "offset = " << offset_ << ", length = " << length_;
}

if (ret == -1) {
if (ret < 0) {
LOG(ERROR) << "split or schedule failed, return and recycle resource!";
ReturnOnFail();
}
}

int IOTracker::ReadFromOrigin(std::vector<RequestContext*> reqCtxVec,
UserInfo_t userInfo) {
if (reqCtxVec.size() <= 0) {
return 0;
}

FileClient4ReadClone &fileClient4Clone =
FileClient4ReadClone::GetFileClient4ReadClone();
FileClient *fileClient = fileClient4Clone.GetFileClient();
std::mutex &mtx = fileClient4Clone.GetFdMutex();
std::unordered_map<std::string, int> &fdMap = fileClient4Clone.GetFdMap();
if (fileClient == nullptr) {
LOG(ERROR) << "Failed to read curve file."
<< "curve client is disabled";
return -1;
}

for (auto reqCtx : reqCtxVec) {
brpc::ClosureGuard doneGuard(reqCtx->done_);
std::string fileName = reqCtx->sourceInfo_.cloneFileSource;
int fd = 0;
{
std::unique_lock<std::mutex> lock(mtx);
auto iter = fdMap.find(fileName);
if (iter != fdMap.end()) {
fd = iter->second;
} else {
fd = fileClient->Open4ReadOnly(fileName, userInfo);
if (fd < 0) {
LOG(ERROR) << "Open curve file failed."
<< "file name: " << fileName
<< " ,return code: " << fd;
return -1;
}
fdMap[fileName] = fd;
}
}
CurveAioCombineContext *curveCombineCtx = new CurveAioCombineContext();
curveCombineCtx->done = reqCtx->done_;
curveCombineCtx->curveCtx.offset = reqCtx->sourceInfo_.cloneFileOffset
+ reqCtx->offset_;
curveCombineCtx->curveCtx.length = reqCtx->rawlength_;
curveCombineCtx->curveCtx.buf = &reqCtx->readData_;
curveCombineCtx->curveCtx.op = LIBCURVE_OP::LIBCURVE_OP_READ;
curveCombineCtx->curveCtx.cb = CurveAioCallback;

int ret = fileClient->AioRead(fd, &curveCombineCtx->curveCtx,
UserDataType::IOBuffer);
if (ret != LIBCURVE_ERROR::OK) {
LOG(ERROR) << "Read curve file failed."
<< "file name: " << fileName
<< " ,error code: " << ret;
delete curveCombineCtx;
return -1;
} else {
doneGuard.release();
}
}
return 0;
}

void IOTracker::StartWrite(const void* buf, off_t offset, size_t length,
MDSClient* mdsclient, const FInfo_t* fileInfo) {
data_ = const_cast<void*>(buf);
Expand Down
9 changes: 9 additions & 0 deletions src/client/io_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker {
// perform read operation
void DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo);

/**
* read from the origin
* @param: reqCtxVec the read request context vector
* @param: the user info
* @return 0 success; -1 fail
*/
int ReadFromOrigin(std::vector<RequestContext*> reqCtxVec,
UserInfo_t userInfo);

// perform write operation
void DoWrite(MDSClient* mdsclient, const FInfo_t* fileInfo);

Expand Down
5 changes: 4 additions & 1 deletion src/client/libcurve_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ CurveClient::~CurveClient() {
}

int CurveClient::Init(const std::string& configPath) {
return fileClient_->Init(configPath);
int ret = fileClient_->Init(configPath);
FileClient4ReadClone::GetFileClient4ReadClone().SetFileClient(fileClient_);
return ret;
}

void CurveClient::UnInit() {
FileClient4ReadClone::GetFileClient4ReadClone().Fini();
return fileClient_->UnInit();
}

Expand Down
67 changes: 67 additions & 0 deletions src/client/libcurve_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,27 @@ class FileClient {
return openedFileNum_.get_value();
}

/**
* test use, set the mdsclient_
*/
void SetMdsClient(MDSClient* client) {
mdsClient_ = client;
}

/**
* test use, set the clientconfig_
*/
void SetClientConfig(ClientConfig cfg) {
clientconfig_ = cfg;
}

/**
* test use, get the fileserviceMap_
*/
std::unordered_map<int, FileInstance*>& GetFileServiceMap() {
return fileserviceMap_;
}

private:
bool StartDummyServer();

Expand Down Expand Up @@ -299,6 +320,52 @@ class FileClient {
bvar::Adder<uint64_t> openedFileNum_;
};

class FileClient4ReadClone {
public:
static FileClient4ReadClone& GetFileClient4ReadClone() {
static FileClient4ReadClone fileClient4ReadClone;
return fileClient4ReadClone;
}

void SetFileClient(FileClient *client) {
delete fileClient_;
fileClient_ = client;
}

FileClient* GetFileClient() {
return fileClient_;
}

std::mutex& GetFdMutex() {
return mtx_;
}

std::unordered_map<std::string, int>& GetFdMap() {
return fdMap_;
}

void Fini() {
if (fileClient_ != nullptr) {
for (auto &pair : fdMap_) {
fileClient_->Close(pair.second);
}
fdMap_.clear();
}
}

private:
FileClient4ReadClone() {}
FileClient4ReadClone(const FileClient4ReadClone &);
FileClient4ReadClone& operator=(const FileClient4ReadClone &);
~FileClient4ReadClone() {}

FileClient *fileClient_ = nullptr;
// the mutex lock for fdMap_
std::mutex mtx_;
// the mapping from filename to fd
std::unordered_map<std::string, int> fdMap_;
};

} // namespace client
} // namespace curve

Expand Down
8 changes: 8 additions & 0 deletions src/client/request_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ int RequestScheduler::ScheduleRequest(
if (running_.load(std::memory_order_acquire)) {
/* TODO(wudemiao): 后期考虑 qos */
for (auto it : requests) {
// skip the fake request
if (!it->idinfo_.chunkExist) {
if (it->sourceInfo_.cloneFileSource.empty()) {
it->done_->Run();
}
continue;
}

BBQItem<RequestContext *> req(it);
queue_.PutBack(req);
}
Expand Down
22 changes: 19 additions & 3 deletions src/client/splitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ bool Splitor::AssignInternal(IOTracker* iotracker, MetaCache* metaCache,
metaCache->GetChunkInfoByIndex(chunkidx, &chunkIdInfo);

if (errCode == MetaCacheErrorType::CHUNKINFO_NOT_FOUND) {
bool isAllocateSegment =
iotracker->Optype() == OpType::READ ? false : true;
if (false == GetOrAllocateSegment(
true,
isAllocateSegment,
static_cast<uint64_t>(chunkidx) * fileinfo->chunksize,
mdsclient, metaCache, fileinfo)) {
mdsclient, metaCache, fileinfo, chunkidx)) {
return false;
}

Expand All @@ -194,6 +196,13 @@ bool Splitor::AssignInternal(IOTracker* iotracker, MetaCache* metaCache,
int ret = 0;
uint64_t appliedindex_ = 0;

// check whether the chunkIdInfo is normal
if (!chunkIdInfo.chunkExist) {
if (iotracker->Optype() == OpType::WRITE) {
return false;
}
}

// only read needs applied-index
if (iotracker->Optype() == OpType::READ) {
appliedindex_ = metaCache->GetAppliedIndex(chunkIdInfo.lpid_,
Expand Down Expand Up @@ -227,8 +236,11 @@ bool Splitor::GetOrAllocateSegment(bool allocateIfNotExist,
uint64_t offset,
MDSClient* mdsClient,
MetaCache* metaCache,
const FInfo* fileInfo) {
const FInfo* fileInfo,
ChunkIndex chunkidx) {
SegmentInfo segmentInfo;
// this chunkIdInfo(0, 0, 0) identify the unallocated chunk when read
ChunkIDInfo chunkIdInfo(0, 0, 0);
LIBCURVE_ERROR errCode = mdsClient->GetOrAllocateSegment(
allocateIfNotExist, offset, fileInfo, &segmentInfo);

Expand All @@ -237,6 +249,10 @@ bool Splitor::GetOrAllocateSegment(bool allocateIfNotExist,
LOG(ERROR) << "GetOrAllocateSegmen failed, filename: "
<< fileInfo->filename << ", offset: " << offset;
return false;
} else if (errCode == LIBCURVE_ERROR::NOT_ALLOCATE) {
chunkIdInfo.chunkExist = false;
metaCache->UpdateChunkInfoByIndex(chunkidx, chunkIdInfo);
return true;
}

const auto chunksize = fileInfo->chunksize;
Expand Down
3 changes: 2 additions & 1 deletion src/client/splitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class Splitor {
uint64_t offset,
MDSClient* mdsClient,
MetaCache* metaCache,
const FInfo* fileInfo);
const FInfo* fileInfo,
ChunkIndex chunkidx);

private:
// IO拆分模块所使用的配置信息
Expand Down
Loading

0 comments on commit 08fa2f1

Please sign in to comment.