Skip to content

Commit

Permalink
[db] Repair wal if it is incomplete #1060 (#1062)
Browse files Browse the repository at this point in the history
add self-repairing wal
  • Loading branch information
michaelvlach authored Mar 16, 2024
1 parent ea6f468 commit 838f4fe
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 4 deletions.
54 changes: 54 additions & 0 deletions agdb/src/storage/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,4 +1200,58 @@ mod tests {
file.resize(10).unwrap();
assert_eq!(file.read(0, 10).unwrap(), "Hello, Wor".as_bytes());
}

#[test]
fn corrupted_wal_is_truncated() {
let test_file = TestFile::new();

//crate wal
{
let mut wal = WriteAheadLog::new(test_file.file_name()).unwrap();
wal.insert(0, "Hello".as_bytes()).unwrap(); //len 5
wal.insert(5, "World".as_bytes()).unwrap();
}

// truncate wal simulating cutoff during write
// of the value causing the wal record to overrun
// the file length
{
let file = File::options()
.write(true)
.open(TestFile::hidden_filename(test_file.file_name()))
.unwrap();
file.set_len((std::mem::size_of::<u64>() * 4 + 6) as u64)
.unwrap();
}

let storage = FileStorage::new(test_file.file_name()).unwrap();
assert_eq!(storage.len(), 5);
}

#[test]
fn corrupted_wal_is_truncated_validation_read_error() {
let test_file = TestFile::new();

//crate wal
{
let mut wal = WriteAheadLog::new(test_file.file_name()).unwrap();
wal.insert(0, "Hello".as_bytes()).unwrap(); //len 5
wal.insert(5, "World".as_bytes()).unwrap();
}

// truncate wal simulating cutoff during write
// of value meta-data (i.e. pos or value len)
// causing the record to be unreadable
{
let file = File::options()
.write(true)
.open(TestFile::hidden_filename(test_file.file_name()))
.unwrap();
file.set_len((std::mem::size_of::<u64>() * 3 + 5) as u64)
.unwrap();
}

let storage = FileStorage::new(test_file.file_name()).unwrap();
assert_eq!(storage.len(), 5);
}
}
44 changes: 42 additions & 2 deletions agdb/src/storage/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ impl WriteAheadLog {
}

pub fn new(filename: &str) -> Result<WriteAheadLog, DbError> {
Ok(Self {
let mut wal = Self {
file: OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(WriteAheadLog::wal_filename(filename))?,
})
};

wal.repair()?;

Ok(wal)
}

pub fn records(&mut self) -> Result<Vec<WriteAheadLogRecord>, DbError> {
Expand Down Expand Up @@ -86,6 +90,42 @@ impl WriteAheadLog {
value: Self::read_exact(file, size)?,
})
}

fn skip_record(file: &mut File) -> Result<(), DbError> {
file.seek(SeekFrom::Current(u64::serialized_size_static() as i64))?;
let value_size = u64::deserialize(&Self::read_exact(file, u64::serialized_size_static())?)?;
file.seek(SeekFrom::Current(value_size as i64))?;
Ok(())
}

fn repair(&mut self) -> Result<(), DbError> {
let size = self.file.seek(SeekFrom::End(0))?;
println!("size: {}", size);
self.file.rewind()?;
let mut pos = 0_u64;

while pos < size {
println!("pos: {}", pos);
if Self::skip_record(&mut self.file).is_err() {
println!("repairing: failed to skip record");
self.file.set_len(pos)?;
return Ok(());
} else {
let new_pos = self.file.stream_position()?;
println!("new_pos: {}", new_pos);

if new_pos > size {
println!("repairing: new_pos > size");
self.file.set_len(pos)?;
return Ok(());
} else {
pos = new_pos;
}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions agdb/src/test_utilities/test_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl TestFile {
}

#[track_caller]
pub fn new() -> TestFile {
pub fn new() -> Self {
let caller = Location::caller();
let file = format!(
"./{}.{}.{}.testfile",
Expand All @@ -27,7 +27,7 @@ impl TestFile {
TestFile::from(file)
}

fn hidden_filename(filename: &String) -> String {
pub fn hidden_filename(filename: &String) -> String {
let path = Path::new(filename);
let name: String = path.file_name().unwrap().to_str().unwrap().to_string();
let parent = path.parent().unwrap();
Expand Down

0 comments on commit 838f4fe

Please sign in to comment.