Skip to content

Commit

Permalink
refactor(commands): improve error handling in check command
Browse files Browse the repository at this point in the history
Based on #317

Signed-off-by: simonsan <14062932+simonsan@users.noreply.github.com>
  • Loading branch information
simonsan committed Oct 7, 2024
1 parent 94aef2b commit 7c541e4
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 47 deletions.
142 changes: 99 additions & 43 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use bytes::Bytes;
use bytesize::ByteSize;
use crossbeam_channel::Sender;
use derive_setters::Setters;
use log::{debug, error, warn};
use rand::{prelude::SliceRandom, thread_rng, Rng};
Expand All @@ -16,7 +17,7 @@ use crate::{
backend::{cache::Cache, decrypt::DecryptReadBackend, node::NodeType, FileType, ReadBackend},
blob::{tree::TreeStreamerOnce, BlobId, BlobType},
crypto::hasher::hash,
error::{CommandErrorKind, RusticErrorKind, RusticResult},
error::{CheckErrorKind, CommandErrorKind, RusticErrorKind, RusticResult},
id::Id,
index::{
binarysorted::{IndexCollector, IndexType},
Expand Down Expand Up @@ -156,10 +157,12 @@ pub struct CheckOptions {
/// # Panics
///
// TODO: Add panics
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn check_repository<P: ProgressBars, S: Open>(
repo: &Repository<P, S>,
opts: CheckOptions,
trees: Vec<TreeId>,
err_send: Sender<CheckErrorKind>,
) -> RusticResult<()> {
let be = repo.dbe();
let cache = repo.cache();
Expand All @@ -179,18 +182,18 @@ pub(crate) fn check_repository<P: ProgressBars, S: Open>(

let p = pb.progress_bytes(format!("checking {file_type:?} in cache..."));
// TODO: Make concurrency (20) customizable
check_cache_files(20, cache, raw_be, file_type, &p)?;
check_cache_files(20, cache, raw_be, file_type, &p, &err_send)?;
}
}
}

if let Some(hot_be) = hot_be {
for file_type in [FileType::Snapshot, FileType::Index] {
check_hot_files(raw_be, hot_be, file_type, pb)?;
check_hot_files(raw_be, hot_be, file_type, pb, &err_send)?;

Check warning on line 192 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L191-L192

Added lines #L191 - L192 were not covered by tests
}
}

let index_collector = check_packs(be, hot_be, pb)?;
let index_collector = check_packs(be, hot_be, pb, &err_send)?;

if let Some(cache) = &cache {
let p = pb.progress_spinner("cleaning up packs from cache...");
Expand All @@ -207,7 +210,7 @@ pub(crate) fn check_repository<P: ProgressBars, S: Open>(
if !opts.trust_cache {
let p = pb.progress_bytes("checking packs in cache...");
// TODO: Make concurrency (5) customizable
check_cache_files(5, cache, raw_be, FileType::Pack, &p)?;
check_cache_files(5, cache, raw_be, FileType::Pack, &p, &err_send)?;
}
}

Expand All @@ -232,13 +235,19 @@ pub(crate) fn check_repository<P: ProgressBars, S: Open>(
packs.into_par_iter().for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
match check_pack(be, pack, data, &p, &err_send) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
Err(err) => {
let _ = err_send.send(CheckErrorKind::ErrorReadingPack {
id,
source: Box::new(err),

Check warning on line 243 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L239-L243

Added lines #L239 - L243 were not covered by tests
});
}
}
});
p.finish();
}

Ok(())
}

Expand All @@ -259,6 +268,7 @@ fn check_hot_files(
be_hot: &impl ReadBackend,
file_type: FileType,
pb: &impl ProgressBars,
err_send: &Sender<CheckErrorKind>,
) -> RusticResult<()> {
let p = pb.progress_spinner(format!("checking {file_type:?} in hot repo..."));
let mut files = be
Expand All @@ -273,18 +283,25 @@ fn check_hot_files(

for (id, size_hot) in files_hot {
match files.remove(&id) {
None => error!("hot file Type: {file_type:?}, Id: {id} does not exist in repo"),
None => {
let _ = err_send.send(CheckErrorKind::NoColdFile { id, file_type });

Check warning on line 287 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L287

Added line #L287 was not covered by tests
}
Some(size) if size != size_hot => {
// TODO: This should be an actual error not a log entry
error!("Type: {file_type:?}, Id: {id}: hot size: {size_hot}, actual size: {size}");
let _ = err_send.send(CheckErrorKind::HotFileSizeMismatch {

Check warning on line 290 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L290

Added line #L290 was not covered by tests
id,
file_type,
size_hot,
size,
});
}
_ => {} //everything ok
}
}

for (id, _) in files {
error!("hot file Type: {file_type:?}, Id: {id} is missing!",);
let _ = err_send.send(CheckErrorKind::NoHotFile { id, file_type });

Check warning on line 302 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L302

Added line #L302 was not covered by tests
}

p.finish();

Ok(())
Expand All @@ -309,6 +326,7 @@ fn check_cache_files(
be: &impl ReadBackend,
file_type: FileType,
p: &impl Progress,
err_send: &Sender<CheckErrorKind>,
) -> RusticResult<()> {
let files = cache.list_with_size(file_type)?;

Expand All @@ -328,15 +346,21 @@ fn check_cache_files(
be.read_full(file_type, &id),
) {
(Err(err), _) => {
error!("Error reading cached file Type: {file_type:?}, Id: {id} : {err}");
let _ = err_send.send(CheckErrorKind::ErrorReadingCache {

Check warning on line 349 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L349

Added line #L349 was not covered by tests
id,
file_type,
source: Box::new(err),
});
}
(_, Err(err)) => {
error!("Error reading file Type: {file_type:?}, Id: {id} : {err}");
let _ = err_send.send(CheckErrorKind::ErrorReadingFile {

Check warning on line 356 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L356

Added line #L356 was not covered by tests
id,
file_type,
source: Box::new(RusticErrorKind::Backend(err).into()),

Check warning on line 359 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L359

Added line #L359 was not covered by tests
});
}
(Ok(Some(data_cached)), Ok(data)) if data_cached != data => {
error!(
"Cached file Type: {file_type:?}, Id: {id} is not identical to backend!"
);
let _ = err_send.send(CheckErrorKind::CacheMismatch { id, file_type });

Check warning on line 363 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L363

Added line #L363 was not covered by tests
}
(Ok(_), Ok(_)) => {} // everything ok
}
Expand All @@ -345,6 +369,7 @@ fn check_cache_files(
});

p.finish();

Ok(())
}

Expand All @@ -368,6 +393,7 @@ fn check_packs(
be: &impl DecryptReadBackend,
hot_be: &Option<impl ReadBackend>,
pb: &impl ProgressBars,
err_send: &Sender<CheckErrorKind>,
) -> RusticResult<IndexCollector> {
let mut packs = HashMap::new();
let mut tree_packs = HashMap::new();
Expand All @@ -388,7 +414,7 @@ fn check_packs(

// Check if time is set _
if check_time && p.time.is_none() {
error!("pack {}: No time is set! Run prune to correct this!", p.id);
let _ = err_send.send(CheckErrorKind::PackTimeNotSet { id: p.id });

Check warning on line 417 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L417

Added line #L417 was not covered by tests
}

// check offsests in index
Expand All @@ -397,17 +423,21 @@ fn check_packs(
blobs.sort_unstable();
for blob in blobs {
if blob.tpe != blob_type {
error!(
"pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}",
p.id, blob.id, blob.tpe, blob_type
);
let _ = err_send.send(CheckErrorKind::PackBlobTypesMismatch {
id: p.id,
blob_id: blob.id,
blob_type: blob.tpe,

Check warning on line 429 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L426-L429

Added lines #L426 - L429 were not covered by tests
expected: blob_type,
});
}

if blob.offset != expected_offset {
error!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
let _ = err_send.send(CheckErrorKind::PackBlobOffsetMismatch {
id: p.id,
blob_id: blob.id,

Check warning on line 437 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L435-L437

Added lines #L435 - L437 were not covered by tests
offset: blob.offset,
expected: expected_offset,
});
}
expected_offset += blob.length;
}
Expand All @@ -418,12 +448,12 @@ fn check_packs(

if let Some(hot_be) = hot_be {
let p = pb.progress_spinner("listing packs in hot repo...");
check_packs_list_hot(hot_be, tree_packs, &packs)?;
check_packs_list_hot(hot_be, tree_packs, &packs, err_send)?;

Check warning on line 451 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L451

Added line #L451 was not covered by tests
p.finish();
}

let p = pb.progress_spinner("listing packs...");
check_packs_list(be, packs)?;
check_packs_list(be, packs, err_send)?;
p.finish();

Ok(index_collector)
Expand All @@ -440,23 +470,34 @@ fn check_packs(
/// # Errors
///
/// If a pack is missing or has a different size
fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<PackId, u32>) -> RusticResult<()> {
fn check_packs_list(
be: &impl ReadBackend,
mut packs: HashMap<PackId, u32>,
err_send: &Sender<CheckErrorKind>,
) -> RusticResult<()> {
for (id, size) in be
.list_with_size(FileType::Pack)
.map_err(RusticErrorKind::Backend)?
{
match packs.remove(&PackId::from(id)) {
None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."),
Some(index_size) if index_size != size => {
error!("pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'.");
let _ = err_send.send(
CheckErrorKind::PackSizeMismatchIndex {

Check warning on line 486 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L485-L486

Added lines #L485 - L486 were not covered by tests
id,
index_size,
size,
}
);
}
_ => {} //everything ok
}
}

for (id, _) in packs {
error!("pack {id} is referenced by the index but not present! To repair: 'rustic repair index'.",);
let _ = err_send.send(CheckErrorKind::NoPack { id });

Check warning on line 498 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L498

Added line #L498 was not covered by tests
}

Ok(())
}

Expand All @@ -470,10 +511,12 @@ fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<PackId, u32>) -> R
/// # Errors
///
/// If a pack is missing or has a different size
// TODO: Error handling!
fn check_packs_list_hot(
be: &impl ReadBackend,
mut treepacks: HashMap<PackId, u32>,
packs: &HashMap<PackId, u32>,
_err_send: &Sender<CheckErrorKind>,
) -> RusticResult<()> {
for (id, size) in be
.list_with_size(FileType::Pack)
Expand Down Expand Up @@ -601,28 +644,37 @@ fn check_pack(
index_pack: IndexPack,
mut data: Bytes,
p: &impl Progress,
err_send: &Sender<CheckErrorKind>,
) -> RusticResult<()> {
let id = index_pack.id;
let size = index_pack.pack_size();
if data.len() != size as usize {
error!(
"pack {id}: data size does not match expected size. Read: {} bytes, expected: {size} bytes",
data.len()
);
let _ = err_send.send(CheckErrorKind::PackSizeMismatch {

Check warning on line 652 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L652

Added line #L652 was not covered by tests
id,
size: data.len(),
expected: size as usize,
});
return Ok(());
}

let comp_id = PackId::from(hash(&data));
if id != comp_id {
error!("pack {id}: Hash mismatch. Computed hash: {comp_id}");
let computed_id = PackId::from(hash(&data));
if id != computed_id {
let _ = err_send.send(CheckErrorKind::PackHashMismatch {

Check warning on line 662 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L662

Added line #L662 was not covered by tests
id,
computed: computed_id,
});
return Ok(());
}

// check header length
let header_len = PackHeaderRef::from_index_pack(&index_pack).size();
let pack_header_len = PackHeaderLength::from_binary(&data.split_off(data.len() - 4))?.to_u32();
if pack_header_len != header_len {
error!("pack {id}: Header length in pack file doesn't match index. In pack: {pack_header_len}, calculated: {header_len}");
let _ = err_send.send(CheckErrorKind::PackHeaderLengthMismatch {

Check warning on line 673 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L673

Added line #L673 was not covered by tests
id,
length: pack_header_len,
computed: header_len,
});
return Ok(());
}

Expand All @@ -633,7 +685,7 @@ fn check_pack(
let mut blobs = index_pack.blobs;
blobs.sort_unstable_by_key(|b| b.offset);
if pack_blobs != blobs {
error!("pack {id}: Header from pack file does not match the index");
let _ = err_send.send(CheckErrorKind::PackHeaderMismatchIndex { id });

Check warning on line 688 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L688

Added line #L688 was not covered by tests
debug!("pack file header: {pack_blobs:?}");
debug!("index: {:?}", blobs);
return Ok(());
Expand All @@ -649,14 +701,18 @@ fn check_pack(
if let Some(length) = blob.uncompressed_length {
blob_data = decode_all(&*blob_data).unwrap();
if blob_data.len() != length.get() as usize {
error!("pack {id}, blob {blob_id}: Actual uncompressed length does not fit saved uncompressed length");
let _ = err_send.send(CheckErrorKind::PackBlobLengthMismatch { id, blob_id });

Check warning on line 704 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L704

Added line #L704 was not covered by tests
return Ok(());
}
}

let comp_id = BlobId::from(hash(&blob_data));
if blob.id != comp_id {
error!("pack {id}, blob {blob_id}: Hash mismatch. Computed hash: {comp_id}");
let computed_id = BlobId::from(hash(&blob_data));
if blob.id != computed_id {
let _ = err_send.send(CheckErrorKind::PackBlobHashMismatch {

Check warning on line 711 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L711

Added line #L711 was not covered by tests
id,
blob_id,
computed: computed_id,
});
return Ok(());
}
p.inc(blob.length.into());
Expand Down
Loading

0 comments on commit 7c541e4

Please sign in to comment.