Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: hset/hdel wrote by braft and binlog #213

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bc9356d
style: clear include headers
longfar-ncy Mar 11, 2024
e093290
feat: add WriteDoneClosure and Apply
longfar-ncy Mar 11, 2024
7401b39
feat: generate binlog.proto in cmake
longfar-ncy Mar 11, 2024
5d2528e
feat: add binlog write callback in storage
longfar-ncy Mar 11, 2024
2dc674a
feat: finish on apply
longfar-ncy Mar 11, 2024
8a5aadb
feat: 'hset' wrote by braft
longfar-ncy Mar 11, 2024
c205ce1
style
longfar-ncy Mar 11, 2024
ff0a38d
feat: cmd hdel write by binlog
longfar-ncy Mar 12, 2024
478fe1f
fix: add depends of custom command
longfar-ncy Mar 16, 2024
512e4f2
feat: add storage option is_use_raft
longfar-ncy Mar 16, 2024
06a6ef7
fix: comment
longfar-ncy Mar 21, 2024
629744c
fix: initialize member variable
longfar-ncy Mar 21, 2024
f76a1d2
Merge branch 'import-braft' into feat/praft-binlog
longfar-ncy Mar 26, 2024
404e8ae
fix: return leader info when write on follower
longfar-ncy Mar 26, 2024
34e4368
fix: remove assert(0)
longfar-ncy Mar 26, 2024
a3180ee
fix: raft_timeout_s
longfar-ncy Mar 26, 2024
08910ee
test: add a simple script to test consistency
longfar-ncy Mar 27, 2024
1c34047
feat: avoid include praft.h in storage and add config option for raft…
longfar-ncy Mar 28, 2024
8fa8f15
fix: remove spare space in pikiwidb.conf
longfar-ncy Mar 28, 2024
673f0b2
test: add consistency go test
longfar-ncy Mar 29, 2024
565285a
test: remove shell script
longfar-ncy Mar 29, 2024
b5560be
fix: default raft_timeout value and tab size
longfar-ncy Apr 3, 2024
c3d1c8b
fix: create `PARENT` dir bug
longfar-ncy Apr 4, 2024
d1a7312
style: format
longfar-ncy Apr 5, 2024
772556b
refactor: use protobuf-lite
longfar-ncy Apr 5, 2024
6359d6d
fix: add dummy service
longfar-ncy Apr 5, 2024
cbc6603
fix: details
longfar-ncy Apr 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ INCLUDE(braft)

ENABLE_TESTING()

SET(PROTO_OUTPUT_DIR "${CMAKE_BINARY_DIR}/generated_pb")
FILE(MAKE_DIRECTORY "${PROTO_OUTPUT_DIR}" PARENT)

ADD_SUBDIRECTORY(src/pstd)
ADD_SUBDIRECTORY(src/net)
ADD_SUBDIRECTORY(src/praft)
Expand Down
1 change: 1 addition & 0 deletions cmake/protobuf.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ FUNCTION(build_protobuf TARGET_NAME)
UPDATE_COMMAND ""
DEPENDS zlib
URL "https://github.com/protocolbuffers/protobuf/archive/v3.18.0.tar.gz"
URL_HASH SHA256=14e8042b5da37652c92ef6a2759e7d2979d295f60afd7767825e3de68c856c54
CONFIGURE_COMMAND mv ../config.sh . COMMAND sh config.sh
CMAKE_CACHE_ARGS
-DCMAKE_INSTALL_PREFIX:PATH=${PROTOBUF_INSTALL_DIR}
Expand Down
4 changes: 2 additions & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
#include <unordered_set>

#include "common.h"
#include "net/tcp_connection.h"
#include "proto_parser.h"
#include "replication.h"
#include "storage/storage.h"
#include "tcp_connection.h"

namespace pikiwidb {

Expand Down Expand Up @@ -250,4 +250,4 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

static thread_local PClient* s_current;
};
} // namespace pikiwidb
} // namespace pikiwidb
3 changes: 1 addition & 2 deletions src/pikiwidb.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

#include "cmd_table_manager.h"
#include "common.h"
#include "event_loop.h"
#include "io_thread_pool.h"
#include "tcp_connection.h"
#include "net/tcp_connection.h"
#include "praft/praft.h"

#define kPIKIWIDB_VERSION "4.0.0"
Expand Down
22 changes: 10 additions & 12 deletions src/praft/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,28 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.

FILE(GLOB PRAFT_PROTO "${CMAKE_CURRENT_SOURCE_DIR}/*.proto")
EXECUTE_PROCESS(
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} --cpp_out=${CMAKE_CURRENT_SOURCE_DIR} -I=${CMAKE_CURRENT_SOURCE_DIR} ${PRAFT_PROTO}
ADD_CUSTOM_COMMAND(
OUTPUT "${PROTO_OUTPUT_DIR}/binlog.pb.cc"
DEPENDS extern_protobuf
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
ARGS -I ${CMAKE_CURRENT_SOURCE_DIR}
--cpp_out ${PROTO_OUTPUT_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/binlog.proto
)
ADD_LIBRARY(binlog_pb STATIC "${PROTO_OUTPUT_DIR}/binlog.pb.cc")

FILE(GLOB PRAFT_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/*.h"
)
SET(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
ADD_LIBRARY(praft ${PRAFT_SRC})

TARGET_INCLUDE_DIRECTORIES(praft
PRIVATE ${PROJECT_SOURCE_DIR}/src
PRIVATE ${PROJECT_SOURCE_DIR}/src/pstd
PRIVATE ${PROJECT_SOURCE_DIR}/src/net
PRIVATE ${PROJECT_SOURCE_DIR}/src/storage/include
PRIVATE ${rocksdb_SOURCE_DIR}/
PRIVATE ${rocksdb_SOURCE_DIR}/include
PRIVATE ${BRAFT_INCLUDE_DIR}
PRIVATE ${BRPC_INCLUDE_DIR}
PRIVATE ${GFLAGS_INCLUDE_PATH}
PRIVATE ${PROJECT_SOURCE_DIR}/src/praft
PRIVATE ${PROTO_OUTPUT_DIR}
)

IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
Expand All @@ -34,4 +32,4 @@ ENDIF()

TARGET_LINK_LIBRARIES(praft net; dl; fmt; storage; pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb z ${PRAFT_LIB})

SET_TARGET_PROPERTIES(praft PROPERTIES LINKER_LANGUAGE CXX)
SET_TARGET_PROPERTIES(praft PROPERTIES LINKER_LANGUAGE CXX)
22 changes: 22 additions & 0 deletions src/praft/binlog.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";
package pikiwidb;
// option optimize_for = LITE_RUNTIME;

enum OperateType {
kNoOperate = 0;
kPut = 1;
kDelete = 2;
}

message BinlogEntry {
uint32 cf_idx = 1;
OperateType op_type = 2;
bytes key = 3;
optional bytes value = 4;
}

message Binlog {
uint32 db_id = 1;
uint32 slot_idx = 2;
repeated BinlogEntry entries = 3;
}
72 changes: 46 additions & 26 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
#include "praft.h"

#include <cassert>
#include <memory>
#include <string>

#include "braft/util.h"
#include "brpc/server.h"

#include "pstd/log.h"
#include "pstd/pstd_string.h"

#include "binlog.pb.h"
#include "client.h"
#include "config.h"
#include "event_loop.h"
#include "log.h"
#include "pikiwidb.h"
#include "praft.pb.h"
#include "pstd_string.h"
#include "store.h"

#define ERROR_LOG_AND_STATUS(msg) \
({ \
Expand All @@ -30,16 +32,6 @@

namespace pikiwidb {

class DummyServiceImpl : public DummyService {
public:
explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {}
void DummyMethod(::google::protobuf::RpcController* controller, const ::pikiwidb::DummyRequest* request,
::pikiwidb::DummyResponse* response, ::google::protobuf::Closure* done) {}

private:
PRaft* praft_;
};

PRaft& PRaft::Instance() {
static PRaft store;
return store;
Expand All @@ -51,12 +43,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
}

server_ = std::make_unique<brpc::Server>();
DummyServiceImpl service(&PRAFT);
auto port = g_config.port + pikiwidb::g_config.raft_port_offset;
// Add your service into RPC server
if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
return ERROR_LOG_AND_STATUS("Failed to add service");
}
// raft can share the same RPC server. Notice the second parameter, because
// adding services into a running server is not allowed and the listen
// address of this server is impossible to get before the server starts. You
Expand Down Expand Up @@ -339,17 +326,50 @@ void PRaft::Join() {
}
}

void PRaft::Apply(braft::Task& task) {
if (node_) {
node_->apply(task);
void PRaft::AppendLog(const Binlog& log, std::promise<rocksdb::Status>&& promise) {
assert(node_);
butil::IOBuf data;
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
auto done = new PRaftWriteDoneClosure(std::move(promise));
if (!log.SerializeToZeroCopyStream(&wrapper)) {
done->SetStatus(rocksdb::Status::Incomplete("Failed to serialize binlog"));
done->Run();
return;
}
DEBUG("append binlog: {}", log.ShortDebugString());
braft::Task task;
task.data = &data;
task.done = done;
node_->apply(task);
}

// @braft::StateMachine
void PRaft::on_apply(braft::Iterator& iter) {
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
// A batch of tasks are committed, which must be processed through
// |iter|
for (; iter.valid(); iter.next()) {
auto done = iter.done();
brpc::ClosureGuard done_guard(done);

Binlog log;
butil::IOBufAsZeroCopyInputStream wrapper(iter.data());
bool success = log.ParseFromZeroCopyStream(&wrapper);
DEBUG("apply binlog: {}", log.ShortDebugString());

if (!success) {
static constexpr std::string_view kMsg = "Failed to parse from protobuf when on_apply";
ERROR(kMsg);
if (done) { // in leader
dynamic_cast<PRaftWriteDoneClosure*>(done)->SetStatus(rocksdb::Status::Incomplete(kMsg));
}
braft::run_closure_in_bthread(done_guard.release());
return;
}

auto s = PSTORE.GetBackend(log.db_id())->OnBinlogWrite(log);
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
if (done) { // in leader
dynamic_cast<PRaftWriteDoneClosure*>(done)->SetStatus(s);
}
// _applied_index = iter.index(); // consider to maintain a member applied_idx
braft::run_closure_in_bthread(done_guard.release());
}
}

Expand Down
44 changes: 25 additions & 19 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,21 @@

#pragma once

#include <memory>
#include <mutex>
#include <tuple>
#include <vector>
#include <string>
#include <future>

#include "braft/configuration.h"
#include "braft/raft.h"
#include "braft/util.h"
#include "brpc/controller.h"
#include "brpc/server.h"
#include "butil/status.h"

#include "client.h"
#include "event_loop.h"
#include "tcp_connection.h"
#include "rocksdb/status.h"

namespace pikiwidb {

#define RAFT_DBID_LEN 32

#define PRAFT PRaft::Instance()

class PClient;
class EventLoop;
class Binlog;

class JoinCmdContext {
friend class PRaft;

Expand Down Expand Up @@ -73,10 +65,24 @@ class JoinCmdContext {
int port_ = 0;
};

class PRaft : public braft::StateMachine {
class PRaftWriteDoneClosure : public braft::Closure {
public:
PRaft() : server_(nullptr), node_(nullptr) {}
explicit PRaftWriteDoneClosure(std::promise<rocksdb::Status>&& promise) : promise_(std::move(promise)) {}

void Run() override {
promise_.set_value(result_);
delete this;
}
void SetStatus(rocksdb::Status status) { result_ = std::move(status); }

private:
std::promise<rocksdb::Status> promise_;
rocksdb::Status result_{rocksdb::Status::Aborted("Unknown error")};
};

class PRaft : public braft::StateMachine {
public:
PRaft() = default;
~PRaft() override = default;

static PRaft& Instance();
Expand All @@ -91,7 +97,7 @@ class PRaft : public braft::StateMachine {

void ShutDown();
void Join();
void Apply(braft::Task& task);
void AppendLog(const Binlog& log, std::promise<rocksdb::Status>&& promise);

//===--------------------------------------------------------------------===//
// ClusterJoin command
Expand Down Expand Up @@ -126,8 +132,8 @@ class PRaft : public braft::StateMachine {
void on_start_following(const ::braft::LeaderChangeContext& ctx) override;

private:
std::unique_ptr<brpc::Server> server_; // brpc
std::unique_ptr<braft::Node> node_;
std::unique_ptr<brpc::Server> server_{nullptr}; // brpc
std::unique_ptr<braft::Node> node_{nullptr};
braft::NodeOptions node_options_; // options for raft node
std::string raw_addr_; // ip:port of this node

Expand Down
13 changes: 0 additions & 13 deletions src/praft/praft.proto

This file was deleted.

4 changes: 2 additions & 2 deletions src/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#include <vector>

#include "common.h"
#include "memory_file.h"
#include "net/unbounded_buffer.h"
#include "net/util.h"
#include "unbounded_buffer.h"
#include "pstd/memory_file.h"

namespace pikiwidb {

Expand Down
15 changes: 14 additions & 1 deletion src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@ TARGET_INCLUDE_DIRECTORIES(storage
PRIVATE ${rocksdb_SOURCE_DIR}/include
PRIVATE ${BRAFT_INCLUDE_DIR}
PRIVATE ${BRPC_INCLUDE_DIR}
PRIVATE ${PROTO_OUTPUT_DIR}
)

TARGET_LINK_LIBRARIES (storage pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb)
TARGET_LINK_LIBRARIES (storage
pstd
braft
brpc
ssl
crypto
zlib
leveldb
gflags
rocksdb
binlog_pb
protobuf
)

SET_TARGET_PROPERTIES(storage PROPERTIES LINKER_LANGUAGE CXX)
6 changes: 5 additions & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "pstd/pstd_mutex.h"
#include "storage/slot_indexer.h"

#include "braft/raft.h"
namespace pikiwidb {
class Binlog;
}

namespace storage {

Expand Down Expand Up @@ -59,6 +61,7 @@ struct StorageOptions {
size_t small_compaction_threshold = 5000;
size_t small_compaction_duration_threshold = 10000;
size_t db_instance_num = 3; // default = 3
bool is_use_raft = true;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
};

Expand Down Expand Up @@ -1078,6 +1081,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);
Status OnBinlogWrite(const pikiwidb::Binlog& log);

private:
std::vector<std::unique_ptr<Redis>> insts_;
Expand Down
Loading
Loading