Skip to content

Commit

Permalink
add PollWatcher scan events
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Jul 15, 2023
1 parent af4c021 commit 55e4f9b
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 9 deletions.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ path = "poll_sysfs.rs"
name = "watcher_kind"
path = "watcher_kind.rs"

[[example]]
name = "pollwatcher_scan"
path = "pollwatcher_scan.rs"

# specifically in its own sub folder
# to prevent cargo audit from complaining
#[[example]]
Expand Down
55 changes: 55 additions & 0 deletions examples/pollwatcher_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use notify::{poll::ScanEvent, Config, PollWatcher, RecursiveMode, Watcher};
use std::path::Path;

// Example for the pollwatcher scan callback feature.
// Returns the scanned paths
fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let path = std::env::args()
.nth(1)
.expect("Argument 1 needs to be a path");

log::info!("Watching {path}");

if let Err(error) = watch(path) {
log::error!("Error: {error:?}");
}
}

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
let (tx, rx) = std::sync::mpsc::channel();

// if you want to use the same channel for both events
// and you need to differentiate between scan and file change events,
// then you will have to use something like this
enum Message {
Event(notify::Result<notify::Event>),
Scan(ScanEvent),
}

let tx_c = tx.clone();
// use the pollwatcher and set a callback for the scanning events
let mut watcher = PollWatcher::with_initial_scan(
move |watch_event| {
tx_c.send(Message::Event(watch_event)).unwrap();
},
Config::default(),
Some(move |scan_event| {
tx.send(Message::Scan(scan_event)).unwrap();
}),
)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

for res in rx {
match res {
Message::Event(e) => println!("Watch event {e:?}"),
Message::Scan(e) => println!("Scan event {e:?}"),
}
}

Ok(())
}
110 changes: 103 additions & 7 deletions notify/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,55 @@ use std::{
time::Duration,
};

/// Event send for registered handler on initial directory scans
pub type ScanEvent = crate::Result<PathBuf>;

/// Handler trait for receivers of ScanEvent.
/// Very much the same as [EventHandler], but including the Result.
///
/// You can implement both on one struct via
/// ```rust
/// use notify::poll::{ScanEventHandler, ScanEvent};
/// use notify::{Result, Event, EventHandler};
/// struct MyStruct;
/// impl ScanEventHandler for MyStruct {
/// fn handle_event(&mut self, event: ScanEvent) { /* your code */ }
/// }
/// impl EventHandler for MyStruct {
/// fn handle_event(&mut self, event: Result<Event>) { /* your code */ }
/// }
/// ```
pub trait ScanEventHandler: Send + 'static {
/// Handles an event.
fn handle_event(&mut self, event: ScanEvent);
}

impl<F> ScanEventHandler for F
where
F: FnMut(ScanEvent) + Send + 'static,
{
fn handle_event(&mut self, event: ScanEvent) {
(self)(event);
}
}

#[cfg(feature = "crossbeam-channel")]
impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}

impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}

impl ScanEventHandler for () {
fn handle_event(&mut self, _event: ScanEvent) {}
}

use data::{DataBuilder, WatchData};
mod data {
use crate::{
Expand All @@ -34,9 +83,12 @@ mod data {
};
use walkdir::WalkDir;

use super::ScanEventHandler;

/// Builder for [`WatchData`] & [`PathData`].
pub(super) struct DataBuilder {
emitter: EventEmitter,
scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,

// TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
// in future.
Expand All @@ -47,12 +99,27 @@ mod data {
}

impl DataBuilder {
pub(super) fn new<F>(event_handler: F, compare_content: bool) -> Self
pub(super) fn new<F, G>(
event_handler: F,
compare_content: bool,
scan_emitter: Option<G>,
) -> Self
where
F: EventHandler,
G: ScanEventHandler,
{
let scan_emitter = match scan_emitter {
None => None,
Some(v) => {
// workaround for a weird type resolution bug when directly going to dyn Trait
let intermediate: Box<RefCell<dyn ScanEventHandler>> =
Box::new(RefCell::new(v));
Some(intermediate)
}
};
Self {
emitter: EventEmitter::new(event_handler),
scan_emitter,
build_hasher: compare_content.then(RandomState::default),
now: Instant::now(),
}
Expand Down Expand Up @@ -131,7 +198,7 @@ mod data {
}

let all_path_data =
Self::scan_all_path_data(data_builder, root.clone(), is_recursive).collect();
Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();

Some(Self {
root,
Expand All @@ -148,7 +215,7 @@ mod data {
pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
// scan current filesystem.
for (path, new_path_data) in
Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive)
Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
{
let old_path_data = self
.all_path_data
Expand Down Expand Up @@ -191,7 +258,10 @@ mod data {
data_builder: &'_ DataBuilder,
root: PathBuf,
is_recursive: bool,
// whether this is an initial scan, used only for events
is_initial: bool,
) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
log::trace!("rescanning {root:?}");
// WalkDir return only one entry if root is a file (not a folder),
// so we can use single logic to do the both file & dir's jobs.
//
Expand All @@ -212,11 +282,25 @@ mod data {
// propagate to event handler. It may not consistent.
//
// FIXME: Should we emit all IO error events? Or ignore them all?
.filter_map(|entry| entry.ok())
.filter_map(|entry| match entry.metadata() {
.filter_map(|entry_res| match entry_res {
Ok(entry) => Some(entry),
Err(err) => {
log::warn!("walkdir error scanning {err:?}");
let crate_err =
crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
data_builder.emitter.emit(Err(crate_err));
None
}
})
.filter_map(move |entry| match entry.metadata() {
Ok(metadata) => {
let path = entry.into_path();

if is_initial {
// emit initial scans
if let Some(ref emitter) = data_builder.scan_emitter {
emitter.borrow_mut().handle_event(Ok(path.clone()));
}
}
let meta_path = MetaPath::from_parts_unchecked(path, metadata);
let data_path = data_builder.build_path_data(&meta_path);

Expand Down Expand Up @@ -409,7 +493,19 @@ pub struct PollWatcher {
impl PollWatcher {
/// Create a new [PollWatcher], configured as needed.
pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
let data_builder = DataBuilder::new(event_handler, config.compare_contents());
Self::with_initial_scan::<_, ()>(event_handler, config, None)
}

/// Create a new [PollWatcher] with an scan event handler.
///
/// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
scan_callback: Option<G>,
) -> crate::Result<PollWatcher> {
let data_builder =
DataBuilder::new(event_handler, config.compare_contents(), scan_callback);

let poll_watcher = PollWatcher {
watches: Default::default(),
Expand Down
8 changes: 6 additions & 2 deletions notify/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ unsafe extern "system" fn handle_event(
};

if !skip {
log::trace!("Event: path = `{}`, action = {:?}", path.display(), (*cur_entry).Action);
log::trace!(
"Event: path = `{}`, action = {:?}",
path.display(),
(*cur_entry).Action
);

let newe = Event::new(EventKind::Any).add_path(path);

Expand Down Expand Up @@ -504,7 +508,7 @@ impl ReadDirectoryChangesWatcher {
}

impl Watcher for ReadDirectoryChangesWatcher {
fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
// create dummy channel for meta event
// TODO: determine the original purpose of this - can we remove it?
let (meta_tx, _) = unbounded();
Expand Down

0 comments on commit 55e4f9b

Please sign in to comment.