diff --git a/crates/core/src/backend.rs b/crates/core/src/backend.rs index b232ceb1..24948af2 100644 --- a/crates/core/src/backend.rs +++ b/crates/core/src/backend.rs @@ -6,6 +6,7 @@ pub(crate) mod dry_run; pub(crate) mod hotcold; pub(crate) mod ignore; pub(crate) mod local_destination; +pub(crate) mod lock; pub(crate) mod node; pub(crate) mod stdin; pub(crate) mod warm_up; @@ -14,8 +15,9 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc}; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use enum_map::Enum; -use log::trace; +use log::{debug, trace}; #[cfg(test)] use mockall::mock; @@ -337,6 +339,32 @@ pub trait WriteBackend: ReadBackend { /// /// The result of the removal. fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; + + /// Specify if the backend is able to lock files + fn can_lock(&self) -> bool { + false + } + + /// Lock the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock + /// + /// # Errors + /// + /// If the file could not be read. + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + debug!("no locking implemented. {tpe:?}, {id}, {until:?}"); + + if self.can_lock() { + unimplemented!("Using default implementation. No locking implemented in backend."); + } else { + Err(anyhow::anyhow!("No locking configured.")) + } + } } #[cfg(test)] @@ -374,6 +402,12 @@ impl WriteBackend for Arc { fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { self.deref().remove(tpe, id, cacheable) } + fn can_lock(&self) -> bool { + self.deref().can_lock() + } + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.deref().lock(tpe, id, until) + } } impl ReadBackend for Arc { diff --git a/crates/core/src/backend/cache.rs b/crates/core/src/backend/cache.rs index d00d9699..d849a6c5 100644 --- a/crates/core/src/backend/cache.rs +++ b/crates/core/src/backend/cache.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use dirs::cache_dir; use log::{trace, warn}; use walkdir::WalkDir; @@ -210,6 +211,18 @@ impl WriteBackend for CachedBackend { } self.be.remove(tpe, id, cacheable) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + + self.be.lock(tpe, id, until) + } } /// Backend that caches data in a directory. diff --git a/crates/core/src/backend/decrypt.rs b/crates/core/src/backend/decrypt.rs index e90d6f86..20e373ea 100644 --- a/crates/core/src/backend/decrypt.rs +++ b/crates/core/src/backend/decrypt.rs @@ -2,6 +2,7 @@ use std::{num::NonZeroU32, sync::Arc}; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use crossbeam_channel::{unbounded, Receiver}; use rayon::prelude::*; use zstd::stream::{copy_encode, decode_all, encode_all}; @@ -578,6 +579,18 @@ impl WriteBackend for DecryptBackend { fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { self.be.remove(tpe, id, cacheable) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + + self.be.lock(tpe, id, until) + } } #[cfg(test)] diff --git a/crates/core/src/backend/dry_run.rs b/crates/core/src/backend/dry_run.rs index e93c9023..244d8710 100644 --- a/crates/core/src/backend/dry_run.rs +++ b/crates/core/src/backend/dry_run.rs @@ -1,5 +1,6 @@ use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use zstd::decode_all; use crate::{ @@ -156,4 +157,16 @@ impl WriteBackend for DryRunBackend { self.be.remove(tpe, id, cacheable) } } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + + self.be.lock(tpe, id, until) + } } diff --git a/crates/core/src/backend/hotcold.rs b/crates/core/src/backend/hotcold.rs index 75a1f750..e98fd7f7 100644 --- a/crates/core/src/backend/hotcold.rs +++ b/crates/core/src/backend/hotcold.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use crate::{ backend::{FileType, ReadBackend, WriteBackend}, @@ -98,4 +99,16 @@ impl WriteBackend for HotColdBackend { } Ok(()) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + + self.be.lock(tpe, id, until) + } } diff --git a/crates/core/src/backend/lock.rs b/crates/core/src/backend/lock.rs new file mode 100644 index 00000000..0575a6cb --- /dev/null +++ b/crates/core/src/backend/lock.rs @@ -0,0 +1,127 @@ +use std::{process::Command, sync::Arc}; + +use anyhow::Result; +use bytes::Bytes; +use chrono::{DateTime, Local}; +use log::{debug, error}; + +use crate::{ + backend::{FileType, ReadBackend, WriteBackend}, + id::Id, + CommandInput, +}; + +/// A backend which warms up files by simply accessing them. +#[derive(Clone, Debug)] +pub struct LockBackend { + /// The backend to use. + be: Arc, + /// The command to be called to lock files in the backend + command: CommandInput, +} + +impl LockBackend { + /// Creates a new `WarmUpAccessBackend`. + /// + /// # Arguments + /// + /// * `be` - The backend to use. + pub fn new_lock(be: Arc, command: CommandInput) -> Arc { + Arc::new(Self { be, command }) + } +} + +impl ReadBackend for LockBackend { + fn location(&self) -> String { + self.be.location() + } + + fn list_with_size(&self, tpe: FileType) -> Result> { + self.be.list_with_size(tpe) + } + + fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.be.read_full(tpe, id) + } + + fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + self.be.read_partial(tpe, id, cacheable, offset, length) + } + + fn list(&self, tpe: FileType) -> Result> { + self.be.list(tpe) + } + + fn needs_warm_up(&self) -> bool { + self.be.needs_warm_up() + } + + fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + self.be.warm_up(tpe, id) + } +} + +fn path_to_id_from_file_type(tpe: FileType, id: &Id) -> String { + let hex_id = id.to_hex(); + match tpe { + FileType::Config => "config".into(), + FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id), + _ => format!("{}/{}", tpe.dirname(), &*hex_id), + } +} + +impl WriteBackend for LockBackend { + fn create(&self) -> Result<()> { + self.be.create() + } + + fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + self.be.write_bytes(tpe, id, cacheable, buf) + } + + fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + self.be.remove(tpe, id, cacheable) + } + + fn can_lock(&self) -> bool { + true + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + + let until = until.map_or_else(String::new, |u| u.to_rfc3339()); + + let path = path_to_id_from_file_type(tpe, id); + + let args = self.command.args().iter().map(|c| { + c.replace("%id", &id.to_hex()) + .replace("%type", tpe.dirname()) + .replace("%path", &path) + .replace("%until", &until) + }); + + debug!("calling {:?}...", self.command); + + let status = Command::new(self.command.command()).args(args).status()?; + + if !status.success() { + error!("lock command was not successful for {tpe:?}, id: {id}. {status}"); + + return Err(anyhow::anyhow!( + "lock command was not successful for {tpe:?}, id: {id}. {status}" + )); + } + + Ok(()) + } +} diff --git a/crates/core/src/commands.rs b/crates/core/src/commands.rs index e4d15a12..37b4262d 100644 --- a/crates/core/src/commands.rs +++ b/crates/core/src/commands.rs @@ -12,6 +12,7 @@ pub mod dump; pub mod forget; pub mod init; pub mod key; +pub mod lock; pub mod merge; pub mod prune; /// The `repair` command. diff --git a/crates/core/src/commands/lock.rs b/crates/core/src/commands/lock.rs new file mode 100644 index 00000000..47b93118 --- /dev/null +++ b/crates/core/src/commands/lock.rs @@ -0,0 +1,75 @@ +//! `lock` subcommand + +use chrono::{DateTime, Local}; +use log::error; +use rayon::ThreadPoolBuilder; + +use crate::{ + error::{CommandErrorKind, RepositoryErrorKind, RusticResult}, + progress::{Progress, ProgressBars}, + repofile::{configfile::ConfigId, IndexId, KeyId, PackId, RepoId, SnapshotId}, + repository::Repository, + WriteBackend, +}; + +pub(super) mod constants { + /// The maximum number of reader threads to use for locking. + pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20; +} + +pub fn lock_repo( + repo: &Repository, + until: Option>, +) -> RusticResult<()> { + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + Ok(()) +} + +pub fn lock_all_files( + repo: &Repository, + until: Option>, +) -> RusticResult<()> { + if !repo.be.can_lock() { + return Err(CommandErrorKind::NoLockingConfigured.into()); + } + + let p = &repo + .pb + .progress_spinner(format!("listing {:?} files..", ID::TYPE)); + let ids: Vec = repo.list()?.collect(); + p.finish(); + lock_files(repo, &ids, until) +} + +fn lock_files( + repo: &Repository, + ids: &[ID], + until: Option>, +) -> RusticResult<()> { + let pool = ThreadPoolBuilder::new() + .num_threads(constants::MAX_LOCKER_THREADS_NUM) + .build() + .map_err(RepositoryErrorKind::FromThreadPoolbilderError)?; + let p = &repo + .pb + .progress_counter(format!("locking {:?} files..", ID::TYPE)); + p.set_length(ids.len().try_into().unwrap()); + let backend = &repo.be; + pool.in_place_scope(|scope| { + for id in ids { + scope.spawn(move |_| { + if let Err(err) = backend.lock(ID::TYPE, id, until) { + // FIXME: Use error handling, e.g. use a channel to collect the errors + error!("lock failed for {:?} {id:?}. {err}", ID::TYPE); + }; + p.inc(1); + }); + } + }); + p.finish(); + Ok(()) +} diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 8c6c0967..e4b58732 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -229,6 +229,8 @@ pub enum CommandErrorKind { NoKeepOption, /// {0:?} FromParseError(#[from] shell_words::ParseError), + /// No locking capability configured for the backend + NoLockingConfigured, } /// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index 03a7d261..96bc46a2 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -13,6 +13,7 @@ use std::{ }; use bytes::Bytes; +use chrono::{DateTime, Local}; use derive_setters::Setters; use log::{debug, error, info}; use serde_with::{serde_as, DisplayFromStr}; @@ -23,6 +24,7 @@ use crate::{ decrypt::{DecryptBackend, DecryptReadBackend, DecryptWriteBackend}, hotcold::HotColdBackend, local_destination::LocalDestination, + lock::LockBackend, node::Node, warm_up::WarmUpAccessBackend, FileType, ReadBackend, WriteBackend, @@ -39,6 +41,7 @@ use crate::{ copy::CopySnapshot, forget::{ForgetGroups, KeepOptions}, key::KeyOptions, + lock::lock_repo, prune::{PruneOptions, PrunePlan}, repair::{ index::{index_checked_from_collector, RepairIndexOptions}, @@ -151,7 +154,7 @@ pub struct RepositoryOptions { /// Warm up needed data pack files by running the command with %id replaced by pack id #[cfg_attr( feature = "clap", - clap(long, global = true, conflicts_with = "warm_up",) + clap(long, global = true, conflicts_with = "warm_up") )] #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] pub warm_up_command: Option, @@ -161,6 +164,11 @@ pub struct RepositoryOptions { #[serde_as(as = "Option")] #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] pub warm_up_wait: Option, + + /// Lock files by running the command with %id replaced by pack id, %type by the file type, %path by file path and %until by the until date + #[cfg_attr(feature = "clap", clap(long, global = true))] + #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] + pub lock_command: Option, } impl RepositoryOptions { @@ -360,6 +368,10 @@ impl

Repository { be = WarmUpAccessBackend::new_warm_up(be); } + if let Some(lock_command) = opts.lock_command.as_ref() { + be = LockBackend::new_lock(be, lock_command.clone()); + } + let mut name = be.location(); if let Some(be_hot) = &be_hot { be = Arc::new(HotColdBackend::new(be, be_hot.clone())); @@ -1195,6 +1207,19 @@ impl Repository { opts.get_plan(self) } + /// Lock the complete repository, i.e. everything in the storage backend. + /// + /// # Arguments + /// + /// * `until` - until when to lock. None means lock forever. + /// + /// # Errors + /// + // TODO: Document errors + pub fn lock_repo(&self, until: Option>) -> RusticResult<()> { + lock_repo(self, until) + } + /// Turn the repository into the `IndexedFull` state by reading and storing the index /// /// # Errors diff --git a/crates/core/src/repository/warm_up.rs b/crates/core/src/repository/warm_up.rs index 14ea76da..081e0d16 100644 --- a/crates/core/src/repository/warm_up.rs +++ b/crates/core/src/repository/warm_up.rs @@ -92,13 +92,12 @@ fn warm_up_command( let p = pb.progress_counter("warming up packs..."); p.set_length(packs.len() as u64); for pack in packs { - let args: Vec<_> = command + let args = command .args() .iter() - .map(|c| c.replace("%id", &pack.to_hex())) - .collect(); + .map(|c| c.replace("%id", &pack.to_hex())); debug!("calling {command:?}..."); - let status = Command::new(command.command()).args(&args).status()?; + let status = Command::new(command.command()).args(args).status()?; if !status.success() { warn!("warm-up command was not successful for pack {pack:?}. {status}"); }