Skip to content

Commit

Permalink
use butil::IOBuf in WriteChunk
Browse files Browse the repository at this point in the history
Change-Id: If514d166a27da3066649820a140f28ec193b41a1
  • Loading branch information
wu-hanqing committed Aug 29, 2020
1 parent 452f8a1 commit 5477453
Show file tree
Hide file tree
Showing 20 changed files with 354 additions and 114 deletions.
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_chunkfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ CSErrorCode CSChunkFile::LoadSnapshot(SequenceNum sn) {
}

CSErrorCode CSChunkFile::Write(SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost) {
Expand Down
22 changes: 21 additions & 1 deletion src/chunkserver/datastore/chunkserver_chunkfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define SRC_CHUNKSERVER_DATASTORE_CHUNKSERVER_CHUNKFILE_H_

#include <glog/logging.h>
#include <butil/iobuf.h>
#include <string>
#include <vector>
#include <set>
Expand Down Expand Up @@ -147,7 +148,7 @@ class CSChunkFile {
* @return: 返回错误码
*/
CSErrorCode Write(SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost);
Expand Down Expand Up @@ -289,6 +290,25 @@ class CSChunkFile {
return rc;
}

inline int writeData(const butil::IOBuf& buf, off_t offset, size_t length) {
int rc = lfs_->Write(fd_, buf, offset + pageSize_, length);
if (rc < 0) {
return rc;
}
// 如果是clone chunk,需要判断是否需要更改bitmap并更新metapage
if (isCloneChunk_) {
uint32_t beginIndex = offset / pageSize_;
uint32_t endIndex = (offset + length - 1) / pageSize_;
for (uint32_t i = beginIndex; i <= endIndex; ++i) {
// 记录dirty page
if (!metaPage_.bitmap->Test(i)) {
dirtyPages_.insert(i);
}
}
}
return rc;
}

inline bool CheckOffsetAndLength(off_t offset, size_t len) {
// 检查offset+len是否越界
if (offset + len > size_) {
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ CSErrorCode CSDataStore::CreateChunkFile(const ChunkOptions & options,

CSErrorCode CSDataStore::WriteChunk(ChunkID id,
SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost,
Expand Down
18 changes: 17 additions & 1 deletion src/chunkserver/datastore/chunkserver_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <bvar/bvar.h>
#include <glog/logging.h>
#include <butil/iobuf.h>
#include <string>
#include <vector>
#include <unordered_map>
Expand All @@ -45,6 +46,8 @@ using curve::fs::LocalFileSystem;
using ::curve::common::Atomic;
using CSChunkFilePtr = std::shared_ptr<CSChunkFile>;

inline void TrivialDeleter(void* ptr) {}

/**
* DataStore的配置参数
* baseDir:DataStore管理的目录路径
Expand Down Expand Up @@ -205,11 +208,24 @@ class CSDataStore {
*/
virtual CSErrorCode WriteChunk(ChunkID id,
SequenceNum sn,
const char * buf,
const butil::IOBuf& buf,
off_t offset,
size_t length,
uint32_t* cost,
const std::string & cloneSourceLocation = "");

// Deprecated, only use for unit & integration test
virtual CSErrorCode WriteChunk(
ChunkID id, SequenceNum sn, const char* buf, off_t offset,
size_t length, uint32_t* cost,
const std::string& cloneSourceLocation = "") {
butil::IOBuf data;
data.append_user_data(const_cast<char*>(buf), length, TrivialDeleter);

return WriteChunk(id, sn, data, offset, length, cost,
cloneSourceLocation);
}

/**
* 创建克隆的Chunk,chunk中记录数据源位置信息
* 该接口需要保证幂等性,重复以相同参数进行创建返回成功
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void WriteChunkRequest::OnApply(uint64_t index,

auto ret = datastore_->WriteChunk(request_->chunkid(),
request_->sn(),
cntl_->request_attachment().to_string().c_str(), //NOLINT
cntl_->request_attachment(),
request_->offset(),
request_->size(),
&cost,
Expand Down Expand Up @@ -512,7 +512,7 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr<CSDataStore> datastore,

auto ret = datastore->WriteChunk(request.chunkid(),
request.sn(),
data.to_string().c_str(),
data,
request.offset(),
request.size(),
&cost,
Expand Down
3 changes: 2 additions & 1 deletion src/fs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ cc_library(
]),
deps = [
"//src/common:curve_common",
"//external:glog"
"//external:glog",
"//external:butil",
],
visibility = ["//visibility:public"],
)
27 changes: 27 additions & 0 deletions src/fs/ext4_filesystem_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,33 @@ int Ext4FileSystemImpl::Write(int fd,
return length;
}

int Ext4FileSystemImpl::Write(int fd,
butil::IOBuf buf,
uint64_t offset,
int length) {
int remainLength = length;
int relativeOffset = 0;
int retryTimes = 0;

while (remainLength > 0) {
ssize_t ret = buf.pcut_into_file_descriptor(fd, offset, length);
if (ret < 0) {
if (errno == EINTR || retryTimes < MAX_RETYR_TIME) {
++retryTimes;
continue;
}
LOG(ERROR) << "IOBuf::pcut_into_file_descriptor failed: "
<< strerror(errno);
return -errno;
}

remainLength -= ret;
offset += ret;
}

return length;
}

int Ext4FileSystemImpl::Append(int fd,
const char *buf,
int length) {
Expand Down
5 changes: 4 additions & 1 deletion src/fs/ext4_filesystem_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#ifndef SRC_FS_EXT4_FILESYSTEM_IMPL_H_
#define SRC_FS_EXT4_FILESYSTEM_IMPL_H_

#include <butil/iobuf.h>

#include <map>
#include <memory>
#include <string>
#include <vector>
#include <map>

#include "src/fs/local_filesystem.h"
#include "src/fs/wrap_posix.h"
Expand All @@ -52,6 +54,7 @@ class Ext4FileSystemImpl : public LocalFileSystem {
int List(const string& dirPath, vector<std::string>* names) override;
int Read(int fd, char* buf, uint64_t offset, int length) override;
int Write(int fd, const char* buf, uint64_t offset, int length) override;
int Write(int fd, butil::IOBuf buf, uint64_t offset, int length) override;
int Append(int fd, const char* buf, int length) override;
int Fallocate(int fd, int op, uint64_t offset,
int length) override;
Expand Down
13 changes: 12 additions & 1 deletion src/fs/local_filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <inttypes.h>
#include <assert.h>
#include <sys/stat.h>
#include <butil/iobuf.h>
#include <memory>
#include <vector>
#include <map>
Expand Down Expand Up @@ -159,6 +160,17 @@ class LocalFileSystem {
*/
virtual int Write(int fd, const char* buf, uint64_t offset, int length) = 0;

/**
* 向文件指定区域写入数据
* @param fd:文件句柄id,通过Open接口获取
* @param buf:待写入数据
* @param offset:写入区域的起始偏移
* @param length:写入数据的长度
* @return 返回成功写入的数据长度,失败返回-1
*/
virtual int Write(int fd, butil::IOBuf buf, uint64_t offset,
int length) = 0;

/**
* 向文件末尾追加数据
* @param fd:文件句柄id,通过Open接口获取
Expand Down Expand Up @@ -218,4 +230,3 @@ class LocalFsFactory {
} // namespace fs
} // namespace curve
#endif // SRC_FS_LOCAL_FILESYSTEM_H_

5 changes: 3 additions & 2 deletions test/chunkserver/conf_epoch_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::AnyNumber;
using ::testing::Matcher;
using ::testing::DoAll;
using ::testing::SetArgPointee;
using ::testing::SetArrayArgument;
Expand Down Expand Up @@ -188,7 +189,7 @@ TEST(ConfEpochFileTest, load_save) {
CopysetID loadCopysetID;
uint64_t loadEpoch;
EXPECT_CALL(*fs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*fs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*fs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(-1));
EXPECT_CALL(*fs, Close(_)).Times(1).WillOnce(Return(0));
ASSERT_EQ(-1, confEpochFile.Save(path,
Expand All @@ -204,7 +205,7 @@ TEST(ConfEpochFileTest, load_save) {
= std::make_shared<MockLocalFileSystem>();
ConfEpochFile confEpochFile(fs);
EXPECT_CALL(*fs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*fs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*fs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*fs, Close(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*fs, Fsync(_)).Times(1).WillOnce(Return(-1));
Expand Down
5 changes: 3 additions & 2 deletions test/chunkserver/copyset_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::AnyNumber;
using ::testing::Matcher;
using ::testing::DoAll;
using ::testing::SetArgPointee;
using ::testing::SetArgReferee;
Expand Down Expand Up @@ -183,7 +184,7 @@ TEST_F(CopysetNodeTest, error_test) {
copysetNode.SetLocalFileSystem(mockfs);
copysetNode.SetConfEpochFile(std::move(epochFile));
EXPECT_CALL(*mockfs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*mockfs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*mockfs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*mockfs, Fsync(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*mockfs, Close(_)).Times(1).WillOnce(Return(0));
Expand Down Expand Up @@ -242,7 +243,7 @@ TEST_F(CopysetNodeTest, error_test) {
copysetNode.SetLocalFileSystem(mockfs);
copysetNode.SetConfEpochFile(std::move(epochFile));
EXPECT_CALL(*mockfs, Open(_, _)).Times(1).WillOnce(Return(10));
EXPECT_CALL(*mockfs, Write(_, _, _, _)).Times(1)
EXPECT_CALL(*mockfs, Write(_, Matcher<const char*>(_), _, _)).Times(1)
.WillOnce(Return(jsonStr.size()));
EXPECT_CALL(*mockfs, Fsync(_)).Times(1).WillOnce(Return(0));
EXPECT_CALL(*mockfs, Close(_)).Times(1).WillOnce(Return(0));
Expand Down
16 changes: 10 additions & 6 deletions test/chunkserver/datastore/chunkfilepool_mock_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ using ::testing::Ge;
using ::testing::Gt;
using ::testing::Return;
using ::testing::NotNull;
using ::testing::Matcher;
using ::testing::Mock;
using ::testing::Truly;
using ::testing::DoAll;
Expand Down Expand Up @@ -161,7 +162,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(-1));
EXPECT_CALL(*lfs_, Write(_, _, _, _))
EXPECT_CALL(*lfs_, Write(_, Matcher<const char*>(_), _, _))
.Times(0);
EXPECT_CALL(*lfs_, Close(_))
.Times(0);
Expand All @@ -176,7 +177,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(1));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, 4096))
EXPECT_CALL(*lfs_, Write(1, Matcher<const char*>(NotNull()), 0, 4096))
.WillOnce(Return(-1));
EXPECT_CALL(*lfs_, Close(1))
.Times(1);
Expand All @@ -191,7 +192,7 @@ TEST_F(CSChunkfilePoolMockTest, PersistEnCodeMetaInfoTest) {
{
EXPECT_CALL(*lfs_, Open(poolMetaPath, _))
.WillOnce(Return(1));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, 4096))
EXPECT_CALL(*lfs_, Write(1, Matcher<const char*>(NotNull()), 0, 4096))
.WillOnce(Return(4096));
EXPECT_CALL(*lfs_, Close(1))
.Times(1);
Expand Down Expand Up @@ -770,7 +771,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(-1));
EXPECT_CALL(*lfs_, Close(1))
Expand All @@ -787,7 +789,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(fileSize));
EXPECT_CALL(*lfs_, Fsync(1))
Expand All @@ -807,7 +810,8 @@ TEST_F(CSChunkfilePoolMockTest, GetChunkTest) {
EXPECT_CALL(*lfs_, Fallocate(1, 0, 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(0));
EXPECT_CALL(*lfs_, Write(1, NotNull(), 0, fileSize))
EXPECT_CALL(*lfs_,
Write(1, Matcher<const char*>(NotNull()), 0, fileSize))
.Times(retryTimes)
.WillRepeatedly(Return(fileSize));
EXPECT_CALL(*lfs_, Fsync(1))
Expand Down
Loading

0 comments on commit 5477453

Please sign in to comment.