Skip to content

Commit

Permalink
issue 259/260: incremental resync (#311)
Browse files Browse the repository at this point in the history
* issue 259/260: incremental resync

* staging handling during fetch remote dat

* adopt movable client blob change in nuraft_mesg

* add delay

* debug level and minor change
  • Loading branch information
yamingk authored Feb 21, 2024
1 parent 84f94ee commit 2f5f94d
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 70 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.5"
version = "5.1.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ table Consensus {

// Leadership expiry 120 seconds
leadership_expiry_ms: uint32 = 120000;

// data fetch max size limit in MB
data_fetch_max_size_mb: uint32 = 2;
}

table HomeStoreSettings {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/device/physical_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f
cinfo->checksum = info_crc;

auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
m_chunk_data_area.insert(
ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size));
if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); }
}
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
Expand Down
132 changes: 95 additions & 37 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) {
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
RD_LOG(DEBUG, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts)
Expand Down Expand Up @@ -184,16 +184,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
ctx->outstanding_read_cnt = fetch_req->request()->entries()->size();

for (auto const& req : *(fetch_req->request()->entries())) {
RD_LOG(INFO, "Data Channel: FetchData received: lsn={}", req->lsn());

auto const& lsn = req->lsn();
auto const& term = req->raft_term();
auto const& dsn = req->dsn();
auto const& header = req->user_header();
auto const& key = req->user_key();
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();

RD_LOG(DEBUG, "Data Channel: FetchData received: lsn={}", lsn);

// release this assert if in the future we want to fetch from non-originator;
RD_REL_ASSERT(originator == server_id(),
"Not expect to receive fetch data from remote when I am not the originator of this request");

// fetch data based on the remote_blkid
if (originator == server_id()) {
// We are the originator of the blkid, read data locally;
Expand All @@ -220,11 +220,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
}
ctx->cv.notify_one();
});
} else {
// TODO: if we are not the originator, we need to fetch based on lsn;
// To be implemented;
RD_LOG(INFO, "I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator,
server_id());
}
}

Expand Down Expand Up @@ -395,6 +390,16 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob
return rreq;
}

auto RaftReplDev::get_max_data_fetch_size() const {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("simulate_staging_fetch_data")) {
LOGINFO("Flip simulate_staging_fetch_data is enabled, return max_data_fetch_size: 16K");
return 4 * 4096ull;
}
#endif
return HS_DYNAMIC_CONFIG(consensus.data_fetch_max_size_mb) * 1024 * 1024ull;
}

void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) {
// Pop any entries that are already completed - from the entries list as well as from map
rreqs->erase(std::remove_if(
Expand All @@ -416,25 +421,51 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre

if (rreqs->size()) {
// Some data not completed yet, let's fetch from remote;
fetch_data_from_remote(rreqs);
auto total_size_to_fetch = 0ul;
std::vector< repl_req_ptr_t > next_batch_rreqs;
const auto max_batch_size = get_max_data_fetch_size();
for (auto const& rreq : *rreqs) {
auto const& size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
if ((total_size_to_fetch + size) >= max_batch_size) {
fetch_data_from_remote(std::move(next_batch_rreqs));
next_batch_rreqs.clear();
total_size_to_fetch = 0;
}

total_size_to_fetch += size;
next_batch_rreqs.emplace_back(rreq);
}

// check if there is any left over not processed;
if (next_batch_rreqs.size()) { fetch_data_from_remote(std::move(next_batch_rreqs)); }
}
}

void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs->size());
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
RD_LOG(DEBUG, "Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs.size(), server_id());
auto const& originator = rreqs.front()->remote_blkid.server_id;

for (auto const& rreq : *rreqs) {
for (auto const& rreq : rreqs) {
entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(),
builder->CreateVector(rreq->header.cbytes(), rreq->header.size()),
builder->CreateVector(rreq->key.cbytes(), rreq->key.size()),
rreq->remote_blkid.server_id /* blkid_originator */,
builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(),
rreq->remote_blkid.blkid.serialized_size())));
LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(),
rreq->remote_blkid.blkid.to_string());
// releax this assert if there is a case in same batch originator can be different (can't think of one now)
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT(rreq->remote_blkid.server_id == originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());

RD_LOG(TRACE, "Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}",
originator, rreq->to_compact_string(), rreq->remote_blkid.blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -444,17 +475,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
// blkid;
group_msg_service()
->data_service_request_bidirectional(
nuraft_mesg::role_regex::LEADER, FETCH_DATA,
originator, FETCH_DATA,
sisl::io_blob_list_t{
sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}})
.via(&folly::InlineExecutor::instance())
.thenValue([this, builder, rreqs](auto e) {
RD_REL_ASSERT(!!e, "Error in fetching data");
if (!e) {
// if we are here, it means the original who sent the log entries are down.
// we need to handle error and when the other member becomes leader, it will resend the log entries;
RD_LOG(INFO,
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs.front()->remote_blkid.server_id, e.error());
for (auto const& rreq : rreqs) {
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
}
return;
}

auto raw_data = e.value().response_blob().cbytes();
auto total_size = e.value().response_blob().size();

for (auto const& rreq : *rreqs) {
RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");

RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size());

thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied
futs.clear();

for (auto const& rreq : rreqs) {
auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
// if data is already received, skip it because someone is already doing the write;
if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_RECEIVED)) {
Expand Down Expand Up @@ -508,25 +558,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
}

// Schedule a write and upon completion, mark the data as written.
data_service()
.async_write(r_cast< const char* >(data), data_size, rreq->local_blkid)
.thenValue([this, rreq](auto&& err) {
RD_REL_ASSERT(!err,
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
});
futs.emplace_back(
data_service().async_write(r_cast< const char* >(data), data_size, rreq->local_blkid));

// move the raw_data pointer to next rreq's data;
raw_data += data_size;
total_size -= data_size;

LOGINFO(
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
RD_LOG(INFO,
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, "
"local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
}

folly::collectAllUnsafe(futs).thenValue([this, rreqs, e = std::move(e)](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
RD_LOG(ERROR, "Error in writing data: {}", ec.value());
// TODO: actually will never arrive here as iomgr will assert (should not assert but
// to raise alert and leave the raft group);
}
}

for (auto const& rreq : rreqs) {
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(TRACE, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});

builder->Release();

RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
Expand Down Expand Up @@ -564,17 +625,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >
// if in resync mode, fetch data from remote immediately;
check_and_fetch_remote_data(rreqs);
} else {
check_and_fetch_remote_data(rreqs);
// some data are not in completed state, let's schedule a timer to check it again;
// we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote;
#if 0
m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread;
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */,
nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) {
LOGINFO("Data Channel: Wait data write timer fired, checking if data is written");
RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written");
check_and_fetch_remote_data(rreqs);
});
#endif
}

// block waiting here until all the futs are ready (data channel filled in and promises are made);
Expand Down
9 changes: 5 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class RaftReplDev : public ReplDev,

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;
Expand All @@ -81,7 +82,7 @@ class RaftReplDev : public ReplDev,
AsyncReplResult<> become_leader() override;
bool is_leader() const override;
const replica_id_t get_leader_id() const override;
std::vector<peer_info> get_replication_status() const override;
std::vector< peer_info > get_replication_status() const override;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down Expand Up @@ -124,8 +125,8 @@ class RaftReplDev : public ReplDev,
void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs);
void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs);

void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
auto get_max_data_fetch_size() const;
bool is_resync_mode() { return m_resync_mode; }
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
};
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class HSTestHelper {

static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params,
hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false,
bool init_device = true) {
bool init_device = true, uint32_t shutdown_delay_sec = 5) {
auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >();
auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024;
auto num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >();
Expand All @@ -185,7 +185,7 @@ class HSTestHelper {
if (fake_restart) {
shutdown_homestore(false);
// sisl::GrpcAsyncClientWorker::shutdown_all();
std::this_thread::sleep_for(std::chrono::seconds{5});
std::this_thread::sleep_for(std::chrono::seconds{shutdown_delay_sec});
}

std::vector< homestore::dev_info > device_info;
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ class HSReplTestHelper {
setup();
}

void restart() {
void restart(uint32_t shutdown_delay_secs = 5) {
test_common::HSTestHelper::start_homestore(
name_ + std::to_string(replica_num_),
{{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}},
{HS_SERVICE::LOG, {}}},
nullptr, true /* restart */);
nullptr, true /* restart */, true /* init_device */, shutdown_delay_secs);
}

void restart_one_by_one() {
Expand Down Expand Up @@ -305,4 +305,4 @@ class HSReplTestHelper {

Runner io_runner_;
};
} // namespace test_common
} // namespace test_common
Loading

0 comments on commit 2f5f94d

Please sign in to comment.