Skip to content

Commit

Permalink
[Cherry-pick] CP tikv#370 & tikv#372 to tikv-7.5
Browse files Browse the repository at this point in the history
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
  • Loading branch information
LykxSassinator committed Dec 3, 2024
1 parent e505d63 commit 0077579
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 23 deletions.
36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ hex = "0.4"
if_chain = "1.0"
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
lz4-sys = "1.9"
log = { version = "0.4", features = [
"max_level_trace",
"release_max_level_debug",
] }
lz4-sys = { version = "=1.9.5" }
memmap2 = { version = "0.9", optional = true }
nix = "0.26"
num-derive = "0.4"
Expand All @@ -55,7 +58,7 @@ protobuf = "2"
rayon = "1.5"
rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1"
serde = { version = "1.0", features = ["derive"] }
serde = { version = "=1.0.194", features = ["derive"] }
serde_repr = "0.1"
strum = { version = "0.25.0", features = ["derive"] }
thiserror = "1.0"
Expand All @@ -64,8 +67,12 @@ thiserror = "1.0"
criterion = "0.4"
ctor = "0.2"
env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
"protobuf-codec",
] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
"protobuf-codec",
] }
rand = "0.8"
rand_distr = "0.4"
tempfile = "3.6"
Expand All @@ -74,19 +81,10 @@ toml = "0.8"
[features]
default = ["internals", "scripting"]
internals = []
nightly = [
"prometheus/nightly",
]
failpoints = [
"fail/failpoints",
]
scripting = [
"rhai",
]
swap = [
"nightly",
"memmap2",
]
nightly = ["prometheus/nightly"]
failpoints = ["fail/failpoints"]
scripting = ["rhai"]
swap = ["nightly", "memmap2"]
std_fs = []

nightly_group = ["nightly", "swap"]
Expand All @@ -95,6 +93,8 @@ nightly_group = ["nightly", "swap"]
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
# Lock the version of cc-rs to avoid build failure on MacOS, ref https://github.com/rust-lang/cc-rs/issues/984.
cc = { git = "https://github.com/rust-lang/cc-rs", tag = "1.0.98" }

[workspace]
members = ["stress", "ctl"]
60 changes: 55 additions & 5 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,12 @@ where
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
// Ensure that the corresponding memtable is locked with a read lock before
// completing the fetching of entries from the raft logs. This
// prevents the scenario where the index could become stale while
// being concurrently updated by the `rewrite` operation.
let immutable = memtable.read();
immutable.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
Expand Down Expand Up @@ -634,9 +637,11 @@ pub(crate) mod tests {
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};

pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> {
Expand Down Expand Up @@ -1928,8 +1933,6 @@ pub(crate) mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
use rand::{thread_rng, Rng};

let dir = tempfile::Builder::new()
.prefix("bench_engine_fetch_entries")
.tempdir()
Expand Down Expand Up @@ -2586,6 +2589,53 @@ pub(crate) mod tests {
assert!(data.is_empty(), "data loss {:?}", data);
}

#[test]
fn test_fetch_with_concurrently_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_concurrently_rewrite")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(2048),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap());
let entry_data = vec![b'x'; 128];
// Set up a concurrent write with purge, and fetch.
let mut vec: Vec<Entry> = Vec::new();
let fetch_engine = engine.clone();
let flag = Arc::new(AtomicBool::new(false));
let start_flag = flag.clone();
let th = std::thread::spawn(move || {
while !start_flag.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
let region_id = thread_rng().gen_range(1..=10);
// Should not return file seqno out of range error.
let _ = fetch_engine
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
.map_err(|e| {
assert!(!format!("{e}").contains("file seqno out of"));
});
vec.clear();
}
});
for i in 0..10 {
for rid in 1..=10 {
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
}
flag.store(true, Ordering::Release);
for rid in 1..=10 {
engine.clean(rid);
}
engine.purge_expired_files().unwrap();
}
th.join().unwrap();
}

#[test]
fn test_internal_key_filter() {
let dir = tempfile::Builder::new()
Expand Down

0 comments on commit 0077579

Please sign in to comment.