-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Repository::lock_repo #163
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
use std::{process::Command, sync::Arc}; | ||
|
||
use anyhow::Result; | ||
use bytes::Bytes; | ||
use chrono::{DateTime, Local}; | ||
use log::{debug, warn}; | ||
|
||
use crate::{ | ||
backend::{FileType, ReadBackend, WriteBackend}, | ||
id::Id, | ||
CommandInput, | ||
}; | ||
|
||
/// A backend which warms up files by simply accessing them. | ||
#[derive(Clone, Debug)] | ||
pub struct LockBackend { | ||
/// The backend to use. | ||
be: Arc<dyn WriteBackend>, | ||
/// The command to be called to lock files in the backend | ||
command: CommandInput, | ||
} | ||
|
||
impl LockBackend { | ||
/// Creates a new `WarmUpAccessBackend`. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `be` - The backend to use. | ||
pub fn new_lock(be: Arc<dyn WriteBackend>, command: CommandInput) -> Arc<dyn WriteBackend> { | ||
Arc::new(Self { be, command }) | ||
} | ||
} | ||
|
||
impl ReadBackend for LockBackend { | ||
fn location(&self) -> String { | ||
self.be.location() | ||
} | ||
|
||
fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> { | ||
self.be.list_with_size(tpe) | ||
} | ||
|
||
fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> { | ||
self.be.read_full(tpe, id) | ||
} | ||
|
||
fn read_partial( | ||
&self, | ||
tpe: FileType, | ||
id: &Id, | ||
cacheable: bool, | ||
offset: u32, | ||
length: u32, | ||
) -> Result<Bytes> { | ||
self.be.read_partial(tpe, id, cacheable, offset, length) | ||
} | ||
|
||
fn list(&self, tpe: FileType) -> Result<Vec<Id>> { | ||
self.be.list(tpe) | ||
} | ||
|
||
fn needs_warm_up(&self) -> bool { | ||
self.be.needs_warm_up() | ||
} | ||
|
||
fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { | ||
self.be.warm_up(tpe, id) | ||
} | ||
} | ||
|
||
fn path(tpe: FileType, id: &Id) -> String { | ||
let hex_id = id.to_hex(); | ||
match tpe { | ||
FileType::Config => "config".into(), | ||
FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id), | ||
_ => format!("{}/{}", tpe.dirname(), &*hex_id), | ||
} | ||
} | ||
simonsan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
impl WriteBackend for LockBackend { | ||
fn create(&self) -> Result<()> { | ||
self.be.create() | ||
} | ||
|
||
fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { | ||
self.be.write_bytes(tpe, id, cacheable, buf) | ||
} | ||
|
||
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { | ||
self.be.remove(tpe, id, cacheable) | ||
} | ||
|
||
fn can_lock(&self) -> bool { | ||
true | ||
} | ||
|
||
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A type state design might be better suited here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we do have a problem with the type state pattern here: The Repository traits need to be object save as we do create dynamic trait objects here. |
||
let until = until.map_or_else(String::new, |u| u.to_rfc3339()); | ||
let path = path(tpe, id); | ||
let args = self.command.args().iter().map(|c| { | ||
c.replace("%id", &id.to_hex()) | ||
.replace("%type", tpe.dirname()) | ||
.replace("%path", &path) | ||
.replace("%until", &until) | ||
}); | ||
debug!("calling {:?}...", self.command); | ||
let status = Command::new(self.command.command()).args(args).status()?; | ||
if !status.success() { | ||
warn!("lock command was not successful for {tpe:?}, id: {id}. {status}"); | ||
} | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
//! `lock` subcommand | ||
|
||
use chrono::{DateTime, Local}; | ||
use log::error; | ||
use rayon::ThreadPoolBuilder; | ||
|
||
use crate::{ | ||
error::{CommandErrorKind, RepositoryErrorKind, RusticResult}, | ||
progress::{Progress, ProgressBars}, | ||
repofile::{configfile::ConfigId, IndexId, KeyId, PackId, RepoId, SnapshotId}, | ||
repository::Repository, | ||
WriteBackend, | ||
}; | ||
|
||
pub(super) mod constants { | ||
/// The maximum number of reader threads to use for locking. | ||
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20; | ||
} | ||
|
||
pub fn lock_repo<P: ProgressBars, S>( | ||
repo: &Repository<P, S>, | ||
until: Option<DateTime<Local>>, | ||
) -> RusticResult<()> { | ||
lock_all_files::<P, S, ConfigId>(repo, until)?; | ||
lock_all_files::<P, S, KeyId>(repo, until)?; | ||
lock_all_files::<P, S, SnapshotId>(repo, until)?; | ||
lock_all_files::<P, S, IndexId>(repo, until)?; | ||
lock_all_files::<P, S, PackId>(repo, until)?; | ||
Ok(()) | ||
} | ||
|
||
pub fn lock_all_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>( | ||
repo: &Repository<P, S>, | ||
until: Option<DateTime<Local>>, | ||
) -> RusticResult<()> { | ||
if !repo.be.can_lock() { | ||
return Err(CommandErrorKind::NoLockingConfigured.into()); | ||
} | ||
|
||
Comment on lines
+36
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see we check that here, but I think this should be checked within the |
||
let p = &repo | ||
.pb | ||
.progress_spinner(format!("listing {:?} files..", ID::TYPE)); | ||
let ids: Vec<ID> = repo.list()?.collect(); | ||
p.finish(); | ||
lock_files(repo, &ids, until) | ||
} | ||
|
||
fn lock_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>( | ||
repo: &Repository<P, S>, | ||
ids: &[ID], | ||
until: Option<DateTime<Local>>, | ||
) -> RusticResult<()> { | ||
let pool = ThreadPoolBuilder::new() | ||
.num_threads(constants::MAX_LOCKER_THREADS_NUM) | ||
.build() | ||
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?; | ||
let p = &repo | ||
.pb | ||
.progress_counter(format!("locking {:?} files..", ID::TYPE)); | ||
p.set_length(ids.len().try_into().unwrap()); | ||
let backend = &repo.be; | ||
pool.in_place_scope(|scope| { | ||
for id in ids { | ||
scope.spawn(move |_| { | ||
if let Err(e) = backend.lock(ID::TYPE, id, until) { | ||
// FIXME: Use error handling | ||
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE); | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it actually enough to just log an error here? If something like that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same error handling discussion as in many places: The best would here to log an error, continue and collect the errors and at the end fail with the list of errors... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah :/ But keeping it running and not failing early also opens room up for possible bugs I think that could affect repository integrity? (in general, not regarding the locking discussion). When you say |
||
p.inc(1); | ||
}); | ||
} | ||
}); | ||
p.finish(); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would check here for
::can_lock()
and if that is true and::lock()
is being called in the default implementation, I wouldpanic!()
orunimplemented!("no locking implemented")
as otherwise someone could actually think, they can use this and won't reimplement it. It should definitely not silently continue here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially because it doesn't implement the expected behaviour.
e.g. how
axum
does it: https://github.com/tokio-rs/axum/releases/tag/axum-v0.8.0-alpha.1breaking: Upgrade matchit to 0.8, changing the path parameter syntax from /:single and /*many to /{single} and /{*many}; the old syntax produces a panic to avoid silent change in behavior (#2645)