Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support recycling log files #224

Merged
merged 54 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
9f9fb54
[Enhancement]Support recycling log files from expired logs.
LykxSassinator Jun 8, 2022
32aa562
Minor modifications on `bench_falloc`
LykxSassinator Jun 9, 2022
f98c76c
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 9, 2022
3da6ac5
[Bugfix] Fix the unexpected bug on the judgement of loops in `bench_f…
LykxSassinator Jun 10, 2022
f322af0
Supplements for UTs on `RecycleFileCollection` and codes clean-up.
LykxSassinator Jun 14, 2022
d2cec63
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 14, 2022
9b5a9a4
Supplement necessary UTs for validating the new format_version with V2
LykxSassinator Jun 14, 2022
5dd42b1
Clean-up unncessary annotations and codes.
LykxSassinator Jun 15, 2022
834b996
Clean-up unnecessary annotations and unused codes.
LykxSassinator Jun 15, 2022
392bb6e
[Refinement] Simplify the configuration on the capacity of `RecycleFi…
LykxSassinator Jun 15, 2022
9ef0f8d
[Refinement] Simplify the configuration on the capacity of `RecycleFi…
LykxSassinator Jun 15, 2022
b508c04
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jun 20, 2022
0a32a25
Reconstruct the implementation of FileCollection for recycling.
LykxSassinator Jun 20, 2022
5820b13
Refactor the `sign_checksum(...Version, Signature)` with `pre_write(.…
LykxSassinator Jun 21, 2022
2047d29
Merge branch 'master' into recycle_logs
LykxSassinator Jun 21, 2022
1aaeb59
Remove unnecessary annotations in `pipe.rs`
LykxSassinator Jun 22, 2022
2a07b53
Modification for code-style consistency.
LykxSassinator Jun 23, 2022
55e5fac
Merge branch 'master' into recycle_logs
LykxSassinator Jun 23, 2022
b786881
Merge branch 'master' into recycle_logs
LykxSassinator Jun 23, 2022
cf9b77e
Make `Recycle log files` compatible to the new feature tikv#229.
LykxSassinator Jun 23, 2022
0a2b085
Merge branch 'master' into recycle_logs
LykxSassinator Jun 24, 2022
7e6a5cb
Update changelog.
LykxSassinator Jun 24, 2022
ccf5ef0
Merge branch 'master' into recycle_logs
LykxSassinator Jun 24, 2022
09103a8
Supply extra setting `allow_recycle` for on / off `Recycle log files`
LykxSassinator Jun 24, 2022
042607a
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jun 24, 2022
c13eed0
Fix code-style bugs examed by `clippy`
LykxSassinator Jun 25, 2022
b1ac73c
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 28, 2022
c37c1c3
Supply `allow-recycle` flag for `stress` tool.
LykxSassinator Jun 28, 2022
db4c0de
Modifications for compatibilities on code-style.
LykxSassinator Jun 30, 2022
1614ef1
Supplement extra UTs for abnormal cases.
LykxSassinator Jul 1, 2022
47e5f7f
Supplement extra ut for testing the abnormal case when `recycle` is o…
LykxSassinator Jul 5, 2022
577ba0f
Modifications on code-style for compatibilities.
LykxSassinator Jul 7, 2022
930fe3f
Minor modifications for compatibilities to other componenents.
LykxSassinator Jul 8, 2022
3e0cfd5
Fix bugs in pipe.rs
LykxSassinator Jul 12, 2022
4060018
Support removing stale files when `engine` exit.
LykxSassinator Jul 12, 2022
190b0e8
Merge branch 'master' into recycle_logs
LykxSassinator Jul 13, 2022
3dd41bf
Supplement necessary code coverage on `rename` in `test_manage_file_r…
LykxSassinator Jul 13, 2022
605c173
Remove unncessary prints in `test_manage_file_rename`
LykxSassinator Jul 13, 2022
2797ccb
Merge branch 'master' into recycle_logs
LykxSassinator Jul 14, 2022
ce6f42f
Simplify the strategy of computing `purge` in the func `purge_to()`.
LykxSassinator Jul 14, 2022
b187930
Refine the performance of `fetch_active_file` by accessing `files`.
LykxSassinator Jul 14, 2022
e3e7c92
Merge branch 'master' into recycle_logs
LykxSassinator Jul 14, 2022
adabbe2
Merge branch 'master' into recycle_logs
LykxSassinator Jul 15, 2022
2276597
Modifications for code-style compatibilities.
LykxSassinator Jul 17, 2022
51b11b9
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jul 17, 2022
7df1fee
Modifications on `purge_to` for more readabilities.
LykxSassinator Jul 18, 2022
80802b4
Fix fmt errs.
LykxSassinator Jul 18, 2022
71f19c0
Supply extra purge strategy for reclaiming redundant space occupied b…
LykxSassinator Jul 18, 2022
1d81e69
Bugfix for the incorrect strategy of counting purged files in `purge_…
LykxSassinator Jul 19, 2022
69a4015
Remove unnecessary functions.
LykxSassinator Jul 19, 2022
0a88245
Bugfix for the strategy in `purge_to` and supplement extra uts for ne…
LykxSassinator Jul 19, 2022
5353d0a
Replace the interface `rename` with `reuse` for more compatiblities.
LykxSassinator Jul 20, 2022
f1a9f9e
Minor modifications on the api in the FilesSystem trait.
LykxSassinator Jul 20, 2022
9d98e7b
Unify the testcase name with the name of its relevant dir.
LykxSassinator Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ cargo +nightly test --test failpoints --all-features -- --test-threads 1

```
cargo +nightly bench --all-features <bench-case-name>
cargo run --release --package stress --help
cargo run --release --package stress -- --help
```

## License
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub struct Config {
/// Default: "0.6"
pub purge_rewrite_garbage_ratio: f64,

/// Reycle the garbage, that is, stale append / rewrite logs,
/// if the size of stale logs lowered than this value.
///
/// Default: None
pub recycle_garbage_ratio: Option<f64>,

/// Maximum memory bytes allowed for the in-memory index.
/// Effective under the `swap` feature only.
///
Expand All @@ -100,6 +106,7 @@ impl Default for Config {
purge_threshold: ReadableSize::gb(10),
purge_rewrite_threshold: None,
purge_rewrite_garbage_ratio: 0.6,
recycle_garbage_ratio: None,
memory_limit: None,
};
// Test-specific configurations.
Expand Down Expand Up @@ -215,6 +222,7 @@ mod tests {
soft_sanitized.target_file_size
);
assert_eq!(soft_sanitized.format_version, Version::default() as u64);
assert!(soft_sanitized.recycle_garbage_ratio.is_none());
}

#[test]
Expand Down
26 changes: 20 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ use crate::consistency::ConsistencyChecker;
use crate::env::{DefaultFileSystem, FileSystem};
use crate::event_listener::EventListener;
use crate::file_pipe_log::debug::LogItemReader;
use crate::file_pipe_log::{
DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder, RecoveryConfig,
};
use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder};
use crate::log_batch::{Command, LogBatch, MessageExt};
use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables};
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog, Signature};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{Error, GlobalStats, Result};

#[cfg(feature = "scripting")]
use crate::file_pipe_log::RecoveryConfig;

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);

pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
Expand Down Expand Up @@ -142,6 +143,10 @@ where
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
let block_handle = {
let mut writer = Writer::new(log_batch, sync, start);
// Here, we introduce the `GroupCommit` by the self-implemented
// `Write Barrier` to improve the data syncing of multi-thread writing.
// If the `write_barrier.enter(...)` returned the leader of the
// writer group, it would responsible for dumping data into the log.
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
Expand All @@ -151,9 +156,12 @@ where
.as_secs_f64(),
);
sync |= writer.sync;
let log_batch = writer.get_payload();
let log_batch = writer.get_mut_payload();
let res = if !log_batch.is_empty() {
// @TODO(lucasliang): bind `Version` to each `LogBatch`
// Signs a checksum, so-called `signature`, into the LogBatch.
let (format_version, file_id) =
self.pipe_log.fetch_active_file(LogQueue::Append);
log_batch.sign_checksum(format_version, file_id);
self.pipe_log
.append(LogQueue::Append, log_batch.encoded_bytes())
} else {
Expand Down Expand Up @@ -480,12 +488,15 @@ where
{
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
let file_id = idx.entries.unwrap().id;
let version = pipe_log.fetch_format_version(file_id)?;
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&pipe_log.read_bytes(idx.entries.unwrap())?,
idx.entries.unwrap(),
idx.compression_type,
(version, Signature::new(file_id)),
)?,
);
}
Expand All @@ -504,12 +515,15 @@ where
{
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
let file_id = idx.entries.unwrap().id;
let version = pipe_log.fetch_format_version(file_id)?;
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&pipe_log.read_bytes(idx.entries.unwrap())?,
idx.entries.unwrap(),
idx.compression_type,
(version, Signature::new(file_id)),
)?,
);
}
Expand Down
51 changes: 51 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl LogFd {
close(self.0).map_err(|e| from_nix_error(e, "close"))
}

pub fn rename<P: AsRef<Path>>(src: P, dst: P) -> IoResult<()> {
fail_point!("log_fd::rename::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
std::fs::rename(src, dst)
}

/// Synchronizes all in-memory data of the file except metadata to the
/// filesystem.
pub fn sync(&self) -> IoResult<()> {
Expand Down Expand Up @@ -165,6 +172,33 @@ impl LogFd {
Ok(())
}
}

#[allow(unused_variables)]
pub fn allocate_with_hole(&self, offset: usize, size: usize) -> IoResult<()> {
fail_point!("log_fd::falloc_with_hole::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
#[cfg(target_os = "linux")]
{
fcntl::fallocate(
self.0,
// It'a special flag for users who want to `fallcate` a file with
// fill a `hole`(that is, zero) into the specified `offset + size`
// position in the file, which is equivalent to the manual `fallocate`
// and `fill zeros` into the file.
// Refer to `[link]https://man7.org/linux/man-pages/man2/fallocate.2.html`.
fcntl::FallocateFlags::FALLOC_FL_KEEP_SIZE
| fcntl::FallocateFlags::FALLOC_FL_PUNCH_HOLE,
offset as i64,
size as i64,
)
.map_err(|e| from_nix_error(e, "fallocate"))
}
#[cfg(not(target_os = "linux"))]
{
Ok(())
}
}
}

impl Handle for LogFd {
Expand Down Expand Up @@ -210,6 +244,15 @@ impl LogFile {
}
}

impl Clone for LogFile {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
offset: self.offset,
}
}
}

impl Write for LogFile {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
let len = self.inner.write(self.offset, buf)?;
Expand Down Expand Up @@ -255,6 +298,10 @@ impl WriteExt for LogFile {
fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.inner.allocate(offset, size)
}

fn allocate_with_hole(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.inner.allocate_with_hole(offset, size)
}
}

pub struct DefaultFileSystem;
Expand All @@ -272,6 +319,10 @@ impl FileSystem for DefaultFileSystem {
LogFd::open(path.as_ref())
}

fn rename<P: AsRef<Path>>(&self, src: P, dst: P) -> IoResult<()> {
LogFd::rename(src, dst)
}

fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(LogFile::new(handle))
}
Expand Down
2 changes: 2 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub trait FileSystem: Send + Sync {

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
fn open<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
fn rename<P: AsRef<Path>>(&self, src: P, dst: P) -> Result<()>;
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;
fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;
}
Expand All @@ -34,4 +35,5 @@ pub trait WriteExt {
fn truncate(&mut self, offset: usize) -> Result<()>;
fn sync(&mut self) -> Result<()>;
fn allocate(&mut self, offset: usize, size: usize) -> Result<()>;
fn allocate_with_hole(&mut self, offset: usize, size: usize) -> Result<()>;
}
9 changes: 8 additions & 1 deletion src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ impl WriteExt for ObfuscatedWriter {
fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.0.allocate(offset, size)
}

fn allocate_with_hole(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.0.allocate_with_hole(offset, size)
}
}

/// `[ObfuscatedFileSystem]` is a special implementation of `[FileSystem]`,
/// which is used for constructing and simulating an abnormal file system for
/// `[Read]` and `[Write]`.
pub struct ObfuscatedFileSystem(DefaultFileSystem);

impl Default for ObfuscatedFileSystem {
fn default() -> Self {
ObfuscatedFileSystem(DefaultFileSystem)
Expand All @@ -89,6 +92,10 @@ impl FileSystem for ObfuscatedFileSystem {
self.0.open(path)
}

fn rename<P: AsRef<Path>>(&self, src: P, dst: P) -> IoResult<()> {
self.0.rename(src, dst)
}

fn new_reader(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(ObfuscatedReader(self.0.new_reader(inner)?))
}
Expand Down
3 changes: 2 additions & 1 deletion src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
#[repr(u64)]
pub enum Version {
V1 = 1,
V2 = 2,
}

impl Version {
Expand Down Expand Up @@ -190,7 +191,7 @@ mod tests {
let version2 = Version::from_u64(1).unwrap();
assert_eq!(version, version2);
assert!(Version::is_valid(1));
assert!(!Version::is_valid(2));
assert!(!Version::is_valid(4));
}

#[test]
Expand Down
Loading