Skip to content

Commit

Permalink
client read not allocate segment
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Dec 23, 2020
1 parent ebc1506 commit 548340d
Show file tree
Hide file tree
Showing 17 changed files with 708 additions and 28 deletions.
70 changes: 70 additions & 0 deletions include/client/libcurve.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
#include <vector>
#include <map>
#include <string>
#include <mutex>
#include <unordered_map>
#include <thread>
#include <atomic>
#include "src/common/interruptible_sleeper.h"

#define IO_ALIGNED_BLOCK_SIZE 4096
#define PATH_MAX_SIZE 4096
#define NAME_MAX_SIZE 256

#define FD_TIME_DELIM std::string("_")
#define FD_TIMEOUT 300
#define FD_CLOSE_TIME_INTERVAL 600

enum FileType {
INODE_DIRECTORY = 0,
INODE_PAGEFILE = 1,
Expand Down Expand Up @@ -379,6 +388,7 @@ namespace curve {
namespace client {

class FileClient;
struct RequestContext;

enum class UserDataType {
RawBuffer, // char*
Expand Down Expand Up @@ -491,6 +501,66 @@ class CurveClient {
FileClient* fileClient_{nullptr};
};

class OriginReader {
public:
static OriginReader& GetInstance();

void SetFileClient(FileClient *client) {
fileClient_ = client;
}

FileClient* GetFileClient() {
return fileClient_;
}

std::unordered_map<std::string, std::string>& GetFdMap() {
return fdMap_;
}

int Init(const std::string& configPath);

void Uinit();

/**
* run the timed stop fd thread
*/
void Run();

/**
* stop the timed stop fd thread
*/
void Stop();

/**
* read from the origin
* @param: reqCtxVec the read request context vector
* @param: the user info
* @return 0 success; -1 fail
*/
int Read(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo);

/**
* close the timeout fd with timed thread
*/
int Closefd();

private:
OriginReader();
~OriginReader();
OriginReader(const OriginReader &);
OriginReader& operator=(const OriginReader &);

FileClient *fileClient_{nullptr};
// the mutex lock for fdMap_
std::mutex mtx_;
// the mapping from filename to fd_timestamp
std::unordered_map<std::string, std::string> fdMap_;
std::thread closeThread;
std::atomic<bool> running_;
curve::common::InterruptibleSleeper sleeper_;
};

} // namespace client
} // namespace curve

Expand Down
8 changes: 8 additions & 0 deletions nebd/src/part2/nebd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ bool NebdServer::InitCurveRequestExecutor() {
return false;
}

int ret = originReader_.Init(confPath);
if (ret < 0) {
LOG(ERROR) << "Init originReader fail";
return false;
}

originReader_.Run();

CurveRequestExecutor::GetInstance().Init(curveClient_);
return true;
}
Expand Down
5 changes: 4 additions & 1 deletion nebd/src/part2/nebd_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ namespace server {

using ::nebd::common::Configuration;
using ::curve::client::CurveClient;
using ::curve::client::OriginReader;

class NebdServer {
public:
NebdServer() {}
NebdServer() : originReader_(OriginReader::GetInstance()) {}
virtual ~NebdServer() {}

int Init(const std::string &confPath,
Expand Down Expand Up @@ -111,6 +112,8 @@ class NebdServer {
std::shared_ptr<HeartbeatManager> heartbeatManager_;
// curveclient
std::shared_ptr<CurveClient> curveClient_;
// originReader
OriginReader &originReader_;
};

} // namespace server
Expand Down
2 changes: 2 additions & 0 deletions src/client/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ typedef struct ChunkIDInfo {
CopysetID cpid_ = 0;
LogicPoolID lpid_ = 0;

bool chunkExist = true;

ChunkIDInfo() = default;

ChunkIDInfo(ChunkID cid, LogicPoolID lpid, CopysetID cpid)
Expand Down
7 changes: 7 additions & 0 deletions src/client/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class ClientConfig {
return fileServiceOption_;
}

/**
* test use, set the fileServiceOption_
*/
void SetFileServiceOption(FileServiceOption opt) {
fileServiceOption_ = opt;
}

uint16_t GetDummyserverStartPort();

private:
Expand Down
26 changes: 24 additions & 2 deletions src/client/io_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "src/client/request_scheduler.h"
#include "src/client/request_closure.h"
#include "src/common/timeutility.h"
#include "src/client/libcurve_file.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -91,26 +92,47 @@ void IOTracker::DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo) {
if (ret == 0) {
PrepareReadIOBuffers(reqlist_.size());
uint32_t subIoIndex = 0;
std::vector<RequestContext*> originReadVec;

reqcount_.store(reqlist_.size(), std::memory_order_release);
std::for_each(reqlist_.begin(), reqlist_.end(), [&](RequestContext* r) {
// fake subrequest
if (!r->idinfo_.chunkExist) {
// the clone source is empty
if (r->sourceInfo_.cloneFileSource.empty()) {
// add zero data
r->readData_.resize(r->rawlength_, 0);
r->done_->SetFailed(LIBCURVE_ERROR::OK);
} else {
// read from original volume
originReadVec.emplace_back(r);
}
}

r->done_->SetFileMetric(fileMetric_);
r->done_->SetIOManager(iomanager_);
r->subIoIndex_ = subIoIndex++;
});

reqcount_.store(reqlist_.size(), std::memory_order_release);
ret = scheduler_->ScheduleRequest(reqlist_);
ret += ReadFromOrigin(originReadVec, fileInfo->userinfo);
} else {
LOG(ERROR) << "splitor read io failed, "
<< "offset = " << offset_ << ", length = " << length_;
}

if (ret == -1) {
if (ret < 0) {
LOG(ERROR) << "split or schedule failed, return and recycle resource!";
ReturnOnFail();
}
}

int IOTracker::ReadFromOrigin(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo) {
OriginReader &originReader = OriginReader::GetInstance();
return originReader.Read(reqCtxVec, userInfo);
}

void IOTracker::StartWrite(const void* buf, off_t offset, size_t length,
MDSClient* mdsclient, const FInfo_t* fileInfo) {
data_ = const_cast<void*>(buf);
Expand Down
9 changes: 9 additions & 0 deletions src/client/io_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker {
// perform read operation
void DoRead(MDSClient* mdsclient, const FInfo_t* fileInfo);

/**
* read from the origin
* @param: reqCtxVec the read request context vector
* @param: the user info
* @return 0 success; -1 fail
*/
int ReadFromOrigin(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo);

// perform write operation
void DoWrite(MDSClient* mdsclient, const FInfo_t* fileInfo);

Expand Down
156 changes: 156 additions & 0 deletions src/client/libcurve_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "include/client/libcurve.h"
#include "src/client/libcurve_file.h"
#include "src/common/string_util.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -123,5 +124,160 @@ void CurveClient::SetFileClient(FileClient* client) {
fileClient_ = client;
}

struct CurveAioCombineContext {
RequestClosure* done;
CurveAioContext curveCtx;
};

void CurveAioCallback(struct CurveAioContext* context) {
auto curveCombineCtx = reinterpret_cast<CurveAioCombineContext *>(
reinterpret_cast<char *>(context) -
offsetof(CurveAioCombineContext, curveCtx));
RequestClosure* done = curveCombineCtx->done;
if (context->ret < 0) {
done->SetFailed(LIBCURVE_ERROR::FAILED);
} else {
done->SetFailed(LIBCURVE_ERROR::OK);
}

delete curveCombineCtx;
brpc::ClosureGuard doneGuard(done);
}

OriginReader::OriginReader() : fileClient_(new FileClient()) {}

OriginReader::~OriginReader() {
Stop();
Uinit();
}

void OriginReader::Run() {
if (!running_) {
running_ = true;
closeThread = std::thread(&OriginReader::Closefd, this);
}
}

void OriginReader::Stop() {
if (running_) {
running_ = false;
sleeper_.interrupt();
closeThread.join();
}
}

int OriginReader::Init(const std::string& configPath) {
return fileClient_->Init(configPath);
}

void OriginReader::Uinit() {
if (fileClient_ != nullptr) {
for (auto &pair : fdMap_) {
std::vector<std::string> fdtimestamp;
curve::common::SplitString(pair.second, FD_TIME_DELIM,
&fdtimestamp);
fileClient_->Close(std::stoi(fdtimestamp[0]));
}
fdMap_.clear();
fileClient_->UnInit();
delete fileClient_;
fileClient_ = nullptr;
}
}

OriginReader& OriginReader::GetInstance() {
static OriginReader originReader;
return originReader;
}

int OriginReader::Read(std::vector<RequestContext*> reqCtxVec,
const UserInfo_t& userInfo) {
if (reqCtxVec.size() <= 0) {
return 0;
}

for (auto reqCtx : reqCtxVec) {
brpc::ClosureGuard doneGuard(reqCtx->done_);
std::string fileName = reqCtx->sourceInfo_.cloneFileSource;
int fd = 0;
std::vector<std::string> fdtimestamp;
{
std::unique_lock<std::mutex> lock(mtx_);
auto iter = fdMap_.find(fileName);
if (iter != fdMap_.end()) {
curve::common::SplitString(iter->second, FD_TIME_DELIM,
&fdtimestamp);
if (fdtimestamp.size() != 2) {
LOG(ERROR) << "Decode the fd from fdMap failed."
<< " file name: " << fileName;
return -1;
}

fd = std::stoi(fdtimestamp[0]);
} else {
fd = fileClient_->Open4ReadOnly(fileName, userInfo);
if (fd < 0) {
LOG(ERROR) << "Open curve file failed."
<< "file name: " << fileName
<< " ,return code: " << fd;
return -1;
}
}
fdMap_[fileName] = std::to_string(fd) + FD_TIME_DELIM +
std::to_string(time(0));
}
CurveAioCombineContext *curveCombineCtx = new CurveAioCombineContext();
curveCombineCtx->done = reqCtx->done_;
curveCombineCtx->curveCtx.offset = reqCtx->sourceInfo_.cloneFileOffset
+ reqCtx->offset_;
curveCombineCtx->curveCtx.length = reqCtx->rawlength_;
curveCombineCtx->curveCtx.buf = &reqCtx->readData_;
curveCombineCtx->curveCtx.op = LIBCURVE_OP::LIBCURVE_OP_READ;
curveCombineCtx->curveCtx.cb = CurveAioCallback;

int ret = fileClient_->AioRead(fd, &curveCombineCtx->curveCtx,
UserDataType::IOBuffer);
if (ret != LIBCURVE_ERROR::OK) {
LOG(ERROR) << "Read curve file failed."
<< "file name: " << fileName
<< " ,error code: " << ret;
delete curveCombineCtx;
return -1;
} else {
doneGuard.release();
}
}
return 0;
}

int OriginReader::Closefd() {
while (running_) {
{
std::unique_lock<std::mutex> lock(mtx_);
for (auto iter = fdMap_.begin(); iter != fdMap_.end();) {
std::vector<std::string> fdtimestamp;
curve::common::SplitString(iter->second, FD_TIME_DELIM,
&fdtimestamp);
if (fdtimestamp.size() != 2) {
LOG(ERROR) << "Decode the fd from fdMap failed "
<< "when close fd";
return -1;
}

int fd = std::stoi(fdtimestamp[0]);
time_t timestamp = std::stol(fdtimestamp[1]);
if (time(0) - timestamp > FD_TIMEOUT) {
fileClient_->Close(fd);
iter = fdMap_.erase(iter);
} else {
iter++;
}
}
}

sleeper_.wait_for(std::chrono::seconds(FD_CLOSE_TIME_INTERVAL));
}
}

} // namespace client
} // namespace curve
Loading

0 comments on commit 548340d

Please sign in to comment.