diff --git a/CHANGELOG.md b/CHANGELOG.md index 935c6f84..de2807b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ## 4.0.16 (future) - FIX: Report events promptly on Linux, even when many occur in rapid succession. [#268] +- FIX: Fix leaks on Windows and debounce module. [#288] - FIX: Display proper error message when reaching inotify limits on linux. [#290] [#268]: https://github.com/notify-rs/notify/pull/268 +[#288]: https://github.com/notify-rs/notify/pull/288 [#290]: https://github.com/notify-rs/notify/pull/290 ## 4.0.15 (2020-01-07) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 802ad49a..90ea85fb 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,15 +1,12 @@ use super::super::{op, DebouncedEvent}; -use std::collections::VecDeque; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::mpsc; -use std::sync::{ - atomic::{self, AtomicBool}, - Arc, Condvar, Mutex, -}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; +use std::{collections::VecDeque, sync::MutexGuard}; use debounce::{OperationsBuffer, OperationsBufferInner}; @@ -20,45 +17,53 @@ struct ScheduledEvent { path: PathBuf, } +#[derive(Default)] +struct WorkerSharedState { + is_stopped: bool, + events: VecDeque, +} + struct ScheduleWorker { - new_event_trigger: Arc, - stop_trigger: Arc, - events: Arc>>, + state: Arc<(Mutex, Condvar)>, tx: mpsc::Sender, operations_buffer: OperationsBuffer, - stopped: Arc, } impl ScheduleWorker { - fn fire_due_events(&self, now: Instant) -> Option { + fn fire_due_events<'a>( + &'a self, + now: Instant, + state: MutexGuard<'a, WorkerSharedState>, + ) -> (Option, MutexGuard<'a, WorkerSharedState>) { // simple deadlock avoidance loop. - let (mut events, mut op_buf) = loop { - let events = self.events.lock().unwrap(); + let mut state = Some(state); + let (mut state, mut op_buf) = loop { + let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap()); // To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the // events Mutex, and retry after yielding. match self.operations_buffer.try_lock() { - Ok(op_buf) => break (events, op_buf), - Err(::std::sync::TryLockError::Poisoned { .. }) => return None, + Ok(op_buf) => break (state, op_buf), + Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state), Err(::std::sync::TryLockError::WouldBlock) => { // drop the lock before yielding to give other threads a chance to complete // their work. - drop(events); + drop(state); ::std::thread::yield_now(); } } }; - while let Some(event) = events.pop_front() { + while let Some(event) = state.events.pop_front() { if event.when <= now { self.fire_event(event, &mut op_buf) } else { // not due yet, put it back let next_when = event.when; - events.push_front(event); - return Some(next_when); + state.events.push_front(event); + return (Some(next_when), state); } } - None + (None, state) } fn fire_event( @@ -97,32 +102,23 @@ impl ScheduleWorker { } fn run(&mut self) { - let m = Mutex::new(()); - - // Unwrapping is safe because the mutex can't be poisoned, - // since we just created it. - let mut g = m.lock().unwrap(); - + let mut state = self.state.0.lock().unwrap(); loop { let now = Instant::now(); - let next_when = self.fire_due_events(now); + let (next_when, state_out) = self.fire_due_events(now, state); + state = state_out; - if self.stopped.load(atomic::Ordering::SeqCst) { + if state.is_stopped { break; } - // Unwrapping is safe because the mutex can't be poisoned, - // since we haven't shared it with another thread. - g = if let Some(next_when) = next_when { + state = if let Some(next_when) = next_when { // wait for stop notification or timeout to send next event - self.stop_trigger - .wait_timeout(g, next_when - now) - .unwrap() - .0 + self.state.1.wait_timeout(state, next_when - now).unwrap().0 } else { // no pending events // wait for new event, to check when it should be send and then wait to send it - self.new_event_trigger.wait(g).unwrap() + self.state.1.wait(state).unwrap() }; } } @@ -130,11 +126,8 @@ impl ScheduleWorker { pub struct WatchTimer { counter: u64, - new_event_trigger: Arc, - stop_trigger: Arc, + state: Arc<(Mutex, Condvar)>, delay: Duration, - events: Arc>>, - stopped: Arc, } impl WatchTimer { @@ -143,64 +136,56 @@ impl WatchTimer { operations_buffer: OperationsBuffer, delay: Duration, ) -> WatchTimer { - let events = Arc::new(Mutex::new(VecDeque::new())); - let new_event_trigger = Arc::new(Condvar::new()); - let stop_trigger = Arc::new(Condvar::new()); - let stopped = Arc::new(AtomicBool::new(false)); - - let worker_new_event_trigger = new_event_trigger.clone(); - let worker_stop_trigger = stop_trigger.clone(); - let worker_events = events.clone(); - let worker_stopped = stopped.clone(); + let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new())); + + let worker_state = state.clone(); thread::spawn(move || { ScheduleWorker { - new_event_trigger: worker_new_event_trigger, - stop_trigger: worker_stop_trigger, - events: worker_events, + state: worker_state, tx, operations_buffer, - stopped: worker_stopped, } .run(); }); WatchTimer { counter: 0, - new_event_trigger, - stop_trigger, + state, delay, - events, - stopped, } } pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); - self.events.lock().unwrap().push_back(ScheduledEvent { - id: self.counter, - when: Instant::now() + self.delay, - path, - }); - - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.events.push_back(ScheduledEvent { + id: self.counter, + when: Instant::now() + self.delay, + path, + }); + } + self.state.1.notify_one(); self.counter } pub fn ignore(&self, id: u64) { - let mut events = self.events.lock().unwrap(); - let index = events.iter().rposition(|e| e.id == id); + let mut state = self.state.0.lock().unwrap(); + let index = state.events.iter().rposition(|e| e.id == id); if let Some(index) = index { - events.remove(index); + state.events.remove(index); } } } impl Drop for WatchTimer { fn drop(&mut self) { - self.stopped.store(true, atomic::Ordering::SeqCst); - self.stop_trigger.notify_one(); - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.is_stopped = true; + } + self.state.1.notify_one(); } } diff --git a/src/windows.rs b/src/windows.rs index 78245cea..cb02c275 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -235,7 +235,9 @@ fn stop_watch(ws: &WatchState, meta_tx: &Sender) { let ch = handleapi::CloseHandle(ws.dir_handle); // have to wait for it, otherwise we leak the memory allocated for there read request if cio != 0 && ch != 0 { - synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); + while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0 + { + } } handleapi::CloseHandle(ws.complete_sem); }