Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support recycling log files #224

Merged
merged 54 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
9f9fb54
[Enhancement]Support recycling log files from expired logs.
LykxSassinator Jun 8, 2022
32aa562
Minor modifications on `bench_falloc`
LykxSassinator Jun 9, 2022
f98c76c
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 9, 2022
3da6ac5
[Bugfix] Fix the unexpected bug on the judgement of loops in `bench_f…
LykxSassinator Jun 10, 2022
f322af0
Supplements for UTs on `RecycleFileCollection` and codes clean-up.
LykxSassinator Jun 14, 2022
d2cec63
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 14, 2022
9b5a9a4
Supplement necessary UTs for validating the new format_version with V2
LykxSassinator Jun 14, 2022
5dd42b1
Clean-up unncessary annotations and codes.
LykxSassinator Jun 15, 2022
834b996
Clean-up unnecessary annotations and unused codes.
LykxSassinator Jun 15, 2022
392bb6e
[Refinement] Simplify the configuration on the capacity of `RecycleFi…
LykxSassinator Jun 15, 2022
9ef0f8d
[Refinement] Simplify the configuration on the capacity of `RecycleFi…
LykxSassinator Jun 15, 2022
b508c04
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jun 20, 2022
0a32a25
Reconstruct the implementation of FileCollection for recycling.
LykxSassinator Jun 20, 2022
5820b13
Refactor the `sign_checksum(...Version, Signature)` with `pre_write(.…
LykxSassinator Jun 21, 2022
2047d29
Merge branch 'master' into recycle_logs
LykxSassinator Jun 21, 2022
1aaeb59
Remove unnecessary annotations in `pipe.rs`
LykxSassinator Jun 22, 2022
2a07b53
Modification for code-style consistency.
LykxSassinator Jun 23, 2022
55e5fac
Merge branch 'master' into recycle_logs
LykxSassinator Jun 23, 2022
b786881
Merge branch 'master' into recycle_logs
LykxSassinator Jun 23, 2022
cf9b77e
Make `Recycle log files` compatible to the new feature tikv#229.
LykxSassinator Jun 23, 2022
0a2b085
Merge branch 'master' into recycle_logs
LykxSassinator Jun 24, 2022
7e6a5cb
Update changelog.
LykxSassinator Jun 24, 2022
ccf5ef0
Merge branch 'master' into recycle_logs
LykxSassinator Jun 24, 2022
09103a8
Supply extra setting `allow_recycle` for on / off `Recycle log files`
LykxSassinator Jun 24, 2022
042607a
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jun 24, 2022
c13eed0
Fix code-style bugs examed by `clippy`
LykxSassinator Jun 25, 2022
b1ac73c
Merge branch 'tikv:master' into recycle_logs
LykxSassinator Jun 28, 2022
c37c1c3
Supply `allow-recycle` flag for `stress` tool.
LykxSassinator Jun 28, 2022
db4c0de
Modifications for compatibilities on code-style.
LykxSassinator Jun 30, 2022
1614ef1
Supplement extra UTs for abnormal cases.
LykxSassinator Jul 1, 2022
47e5f7f
Supplement extra ut for testing the abnormal case when `recycle` is o…
LykxSassinator Jul 5, 2022
577ba0f
Modifications on code-style for compatibilities.
LykxSassinator Jul 7, 2022
930fe3f
Minor modifications for compatibilities to other componenents.
LykxSassinator Jul 8, 2022
3e0cfd5
Fix bugs in pipe.rs
LykxSassinator Jul 12, 2022
4060018
Support removing stale files when `engine` exit.
LykxSassinator Jul 12, 2022
190b0e8
Merge branch 'master' into recycle_logs
LykxSassinator Jul 13, 2022
3dd41bf
Supplement necessary code coverage on `rename` in `test_manage_file_r…
LykxSassinator Jul 13, 2022
605c173
Remove unncessary prints in `test_manage_file_rename`
LykxSassinator Jul 13, 2022
2797ccb
Merge branch 'master' into recycle_logs
LykxSassinator Jul 14, 2022
ce6f42f
Simplify the strategy of computing `purge` in the func `purge_to()`.
LykxSassinator Jul 14, 2022
b187930
Refine the performance of `fetch_active_file` by accessing `files`.
LykxSassinator Jul 14, 2022
e3e7c92
Merge branch 'master' into recycle_logs
LykxSassinator Jul 14, 2022
adabbe2
Merge branch 'master' into recycle_logs
LykxSassinator Jul 15, 2022
2276597
Modifications for code-style compatibilities.
LykxSassinator Jul 17, 2022
51b11b9
Merge branch 'recycle_logs' of github.com:LykxSassinator/raft-engine …
LykxSassinator Jul 17, 2022
7df1fee
Modifications on `purge_to` for more readabilities.
LykxSassinator Jul 18, 2022
80802b4
Fix fmt errs.
LykxSassinator Jul 18, 2022
71f19c0
Supply extra purge strategy for reclaiming redundant space occupied b…
LykxSassinator Jul 18, 2022
1d81e69
Bugfix for the incorrect strategy of counting purged files in `purge_…
LykxSassinator Jul 19, 2022
69a4015
Remove unnecessary functions.
LykxSassinator Jul 19, 2022
0a88245
Bugfix for the strategy in `purge_to` and supplement extra uts for ne…
LykxSassinator Jul 19, 2022
5353d0a
Replace the interface `rename` with `reuse` for more compatiblities.
LykxSassinator Jul 20, 2022
f1a9f9e
Minor modifications on the api in the FilesSystem trait.
LykxSassinator Jul 20, 2022
9d98e7b
Unify the testcase name with the name of its relevant dir.
LykxSassinator Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,8 +1860,14 @@ mod tests {
Ok(())
}

fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> std::io::Result<()> {
self.inner.rename(src_path.as_ref(), dst_path.as_ref())?;
fn rename<P: AsRef<Path>>(
&self,
src_path: P,
dst_path: P,
keep_data: bool,
) -> std::io::Result<()> {
self.inner
.rename(src_path.as_ref(), dst_path.as_ref(), keep_data)?;
self.update_metadata(src_path.as_ref(), true);
self.update_metadata(dst_path.as_ref(), false);
Ok(())
Expand Down Expand Up @@ -2093,14 +2099,17 @@ mod tests {
let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap();
let (start, end) = engine.file_span(LogQueue::Append);
assert_eq!((start, end), (1, 11));
for rid in 1..=5 {
for rid in 6..=10 {
engine.append(rid, 11, 20, Some(&entry_data));
}
// Mark region_id -> 6 obsolete.
for rid in 6..=6 {
engine.clean(rid);
}
// the [1, 11] files are recycled
// the [1, 12] files are recycled
engine.purge_expired_files().unwrap();
assert!(start < engine.file_span(LogQueue::Append).0);
assert_eq!(start + 15, engine.file_span(LogQueue::Append).0);
assert_eq!(engine.file_count(Some(LogQueue::Append)), 1);
assert_eq!(engine.file_count(Some(LogQueue::Append)), 5);
assert_eq!(start + 12, engine.file_span(LogQueue::Append).0);
}
}
}
3 changes: 2 additions & 1 deletion src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ impl FileSystem for DefaultFileSystem {
std::fs::remove_file(path)
}

fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> IoResult<()> {
fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P, keep_data: bool) -> IoResult<()> {
debug_assert!(!keep_data);
std::fs::rename(src_path, dst_path)
}

Expand Down
8 changes: 7 additions & 1 deletion src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ pub trait FileSystem: Send + Sync {

fn delete<P: AsRef<Path>>(&self, path: P) -> Result<()>;

fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> Result<()>;
/// Rename the file from `src_path` to `dst_path`.
///
/// `keep_data` is introduced to be compatible with user-defined
/// `FileSystem`, which means whether the data are allowed to be
/// modified by user-defined implementation on `rename` or not.
/// Defautly, `keep_data` is `false`.
fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P, keep_data: bool) -> Result<()>;

/// Deletes user implemented metadata associated with `path`. Returns
/// `true` if any metadata is deleted.
Expand Down
5 changes: 3 additions & 2 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ impl FileSystem for ObfuscatedFileSystem {
r
}

fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> IoResult<()> {
self.inner.rename(src_path, dst_path)
fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P, keep_data: bool) -> IoResult<()> {
debug_assert!(!keep_data);
self.inner.rename(src_path, dst_path, keep_data)
}

fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Expand Down
51 changes: 21 additions & 30 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ impl<F: FileSystem> FileCollection<F> {
if self.fds.pop_front().is_some() {
let src_path = first_file_id.build_file_path(dir_path); // src filepath
let dst_path = dst_fd.build_file_path(dir_path); // dst filepath
match file_system.rename(&src_path, &dst_path) {
Ok(_) => {
// Update the first_seq
self.first_seq += 1;
ret = true;
}
Err(e) => {
ret = false;
error!("error while trying to recycle one expired file: {}", e);
}
if let Err(e) = file_system.rename(&src_path, &dst_path, false) {
error!("error while trying to recycle one expired file: {}", e);
ret = false;
} else {
// Update the first_seq
self.first_seq += 1;
ret = true;
}
}
// Only if the `rename` made sense, we could return success.
Expand Down Expand Up @@ -385,11 +382,13 @@ impl<F: FileSystem> SinglePipe<F> {
self.rotate_imp(&mut self.active_file.lock())
}

/// Purge obsolete log files to the specific `FileSeq`.
///
/// Return the actual removed count of purged files.
fn purge_to(&self, file_seq: FileSeq) -> Result<usize> {
let (
first_purge_seq, /* first seq for purging */
purged, /* count of purged files */
recycled, /* count of recycled files */
remained, /* count of remained files */
) = {
let mut files = self.files.write();
Expand All @@ -399,39 +398,33 @@ impl<F: FileSystem> SinglePipe<F> {
return Ok(0);
}

// If capacity == 0, `files` not support to `recycle`.
let logically_purged = if files.capacity == 0 {
0
} else {
(file_seq - files.first_seq_in_use) as usize
};
// Remove some obsolete files if capacity is exceeded.
let obsolete_files = (file_seq - files.first_seq) as usize;
// When capacity is zero, always remove logically deleted files.
let capacity_exceeded = files.fds.len().saturating_sub(files.capacity);
let purged = std::cmp::min(capacity_exceeded, obsolete_files);

// The files marked with `recycle` but with format_version V1 should also
// be removed.
files.first_seq += purged as u64;
let extra_purged = {
// The files with format_version `V1` cannot be chosen as recycle
// candidates, which should also be removed.
let mut count = 0;
for recycle_idx in 0..(file_seq - files.first_seq) as usize {
for recycle_idx in purged..obsolete_files {
if !files.fds[recycle_idx].context.version.has_log_signing() {
count += 1;
} else {
break;
}
}
count
};
let final_purge_count = purged + extra_purged;
// Update metadata of files
files.first_seq += extra_purged as u64;
files.first_seq += final_purge_count as u64;
files.first_seq_in_use = file_seq;
files.fds.drain(..final_purge_count);
(
files.first_seq - final_purge_count as u64,
final_purge_count,
logically_purged.saturating_sub(extra_purged),
files.fds.len(),
)
};
Expand All @@ -454,15 +447,13 @@ impl<F: FileSystem> SinglePipe<F> {
}
self.file_system.delete(&path)?;
}
Ok(purged + recycled)
Ok(purged)
}

fn fetch_active_file(&self) -> LogFileContext {
let files = self.files.read();
match files.fds.back() {
Some(active_fd) => LogFileContext::new(active_fd.context.id, active_fd.context.version),
_ => unreachable!(),
}
let active_fd = files.fds.back().unwrap();
LogFileContext::new(active_fd.context.id, active_fd.context.version)
}
}

Expand Down Expand Up @@ -875,8 +866,8 @@ mod tests {
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 3);

// purge file 1
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1);
// Purge to file 1, this file would be recycled
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 0);
assert_eq!(pipe_log.file_span(queue).0, 2);

// cannot purge active file
Expand Down
4 changes: 2 additions & 2 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ impl RhaiFilterMachine {
// Backup file and set up a guard to recover on exit.
let target_path = f.file_id.build_file_path(path);
let bak_path = target_path.with_extension("bak");
system.rename(&target_path, &bak_path)?;
system.rename(&target_path, &bak_path, false)?;
guards.push((
bak_path.clone(),
guard(f.file_id, |f| {
let original = f.build_file_path(path);
let bak = original.with_extension("bak");
if bak.exists() {
system.rename(&bak, &original).unwrap_or_else(|e| {
system.rename(&bak, &original, false).unwrap_or_else(|e| {
panic!(
"Failed to recover original log file {} ({}),
you should manually replace it with {}.bak.",
Expand Down
52 changes: 20 additions & 32 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,21 +421,11 @@ impl LogItemBatch {
// Insert the signature into the encoded bytes. Rewrite checksum of
// `LogItemBatch` in `LogBatch`.
let footer_checksum_offset = buf.len() - LOG_BATCH_CHECKSUM_LEN;
match codec::decode_u32_le(&mut &buf[footer_checksum_offset..]) {
// The final checksum is generated by `signature` ***XOR***
// `original checksum of buf`.
Ok(original_checksum) => {
let _ = (&mut buf[footer_checksum_offset..])
.write_u32::<LittleEndian>(original_checksum ^ signature);
return Ok(());
}
_ => {
return Err(Error::Corruption(format!(
"Checksum in this LogBatch is corrputed, footer_checksum_offset: {}",
footer_checksum_offset
)));
}
}
let original_checksum = codec::decode_u32_le(&mut &buf[footer_checksum_offset..])?;
// The final checksum is generated by `signature` ***XOR***
// `original checksum of buf`.
(&mut buf[footer_checksum_offset..])
.write_u32::<LittleEndian>(original_checksum ^ signature)?;
}
}
Ok(())
Expand Down Expand Up @@ -551,6 +541,12 @@ enum BufState {
/// # Invariants
/// LOG_BATCH_HEADER_LEN <= buf.len()
Open,
/// Buffer contains header, entries and footer; ready to be written. The
/// footer may be signed with extra information depending on the format
/// version.
/// # Content
/// (header_offset, entries_len)
Encoded(usize, usize),
/// Buffer contains header, entries and footer; ready to be written. This
/// state only briefly exists between encoding and writing, user operation
/// will panic under this state.
Expand All @@ -559,12 +555,6 @@ enum BufState {
/// # Invariants
/// LOG_BATCH_HEADER_LEN <= buf.len()
Sealed(usize, usize),
/// Buffer contains header, entries and footer; ready to be written. The
/// footer may be signed with extra information depending on the format
/// version.
/// # Content
/// (header_offset, entries_len)
Encoded(usize, usize),
/// Buffer is undergoing writes. User operation will panic under this state.
Incomplete,
}
Expand Down Expand Up @@ -739,7 +729,7 @@ impl LogBatch {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Sealed(self.buf.len(), 0);
self.buf_state = BufState::Encoded(self.buf.len(), 0);
return Ok(0);
}
self.buf_state = BufState::Incomplete;
Expand Down Expand Up @@ -792,19 +782,17 @@ impl LogBatch {
}
}

self.buf_state = BufState::Sealed(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
Ok(self.buf.len() - header_offset)
}

/// Make preparations for the write of `LogBatch`.
#[inline]
pub(crate) fn prepare_write(&mut self, file_context: &LogFileContext) -> Result<()> {
match self.buf_state {
BufState::Sealed(header_offset, entries_len) => {
if let Err(e) = LogItemBatch::prepare_write(&mut self.buf, file_context) {
return Err(e);
}
self.buf_state = BufState::Encoded(header_offset, entries_len);
BufState::Encoded(header_offset, entries_len) => {
LogItemBatch::prepare_write(&mut self.buf, file_context)?;
self.buf_state = BufState::Sealed(header_offset, entries_len);
}
_ => unreachable!(),
}
Expand All @@ -815,7 +803,7 @@ impl LogBatch {
/// Assumes called after a successful call of [`prepare_write`].
pub(crate) fn encoded_bytes(&self) -> &[u8] {
match self.buf_state {
BufState::Encoded(header_offset, _) => &self.buf[header_offset..],
BufState::Sealed(header_offset, _) => &self.buf[header_offset..],
_ => unreachable!(),
}
}
Expand All @@ -824,12 +812,12 @@ impl LogBatch {
///
/// Internally sets the file locations of each log entry indexes.
pub(crate) fn finish_write(&mut self, mut handle: FileBlockHandle) {
debug_assert!(matches!(self.buf_state, BufState::Encoded(_, _)));
debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _)));
if !self.is_empty() {
// adjust log batch handle to log entries handle.
handle.offset += LOG_BATCH_HEADER_LEN as u64;
match self.buf_state {
BufState::Encoded(_, entries_len) => {
BufState::Sealed(_, entries_len) => {
debug_assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len as usize);
handle.len = entries_len;
}
Expand Down Expand Up @@ -859,8 +847,8 @@ impl LogBatch {
BufState::Open => {
self.buf.len() + LOG_BATCH_CHECKSUM_LEN + self.item_batch.approximate_size()
}
BufState::Sealed(header_offset, _) => self.buf.len() - header_offset,
BufState::Encoded(header_offset, _) => self.buf.len() - header_offset,
BufState::Sealed(header_offset, _) => self.buf.len() - header_offset,
s => {
error!("querying incomplete log batch with state {:?}", s);
0
Expand Down