Skip to content

Commit

Permalink
Pull Request #288: V4 leak fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Awfa authored Mar 25, 2021
2 parents 821331f + fe858f1 commit 54079d9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
## 4.0.16 (future)

- FIX: Report events promptly on Linux, even when many occur in rapid succession. [#268]
- FIX: Fix leaks on Windows and debounce module. [#288]
- FIX: Display proper error message when reaching inotify limits on linux. [#290]

[#268]: https://github.com/notify-rs/notify/pull/268
[#288]: https://github.com/notify-rs/notify/pull/288
[#290]: https://github.com/notify-rs/notify/pull/290

## 4.0.15 (2020-01-07)
Expand Down
123 changes: 54 additions & 69 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::super::{op, DebouncedEvent};

use std::collections::VecDeque;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{
atomic::{self, AtomicBool},
Arc, Condvar, Mutex,
};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{collections::VecDeque, sync::MutexGuard};

use debounce::{OperationsBuffer, OperationsBufferInner};

Expand All @@ -20,45 +17,53 @@ struct ScheduledEvent {
path: PathBuf,
}

#[derive(Default)]
struct WorkerSharedState {
is_stopped: bool,
events: VecDeque<ScheduledEvent>,
}

struct ScheduleWorker {
new_event_trigger: Arc<Condvar>,
stop_trigger: Arc<Condvar>,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
}

impl ScheduleWorker {
fn fire_due_events(&self, now: Instant) -> Option<Instant> {
fn fire_due_events<'a>(
&'a self,
now: Instant,
state: MutexGuard<'a, WorkerSharedState>,
) -> (Option<Instant>, MutexGuard<'a, WorkerSharedState>) {
// simple deadlock avoidance loop.
let (mut events, mut op_buf) = loop {
let events = self.events.lock().unwrap();
let mut state = Some(state);
let (mut state, mut op_buf) = loop {
let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap());

// To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the
// events Mutex, and retry after yielding.
match self.operations_buffer.try_lock() {
Ok(op_buf) => break (events, op_buf),
Err(::std::sync::TryLockError::Poisoned { .. }) => return None,
Ok(op_buf) => break (state, op_buf),
Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state),
Err(::std::sync::TryLockError::WouldBlock) => {
// drop the lock before yielding to give other threads a chance to complete
// their work.
drop(events);
drop(state);
::std::thread::yield_now();
}
}
};
while let Some(event) = events.pop_front() {
while let Some(event) = state.events.pop_front() {
if event.when <= now {
self.fire_event(event, &mut op_buf)
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
state.events.push_front(event);
return (Some(next_when), state);
}
}
None
(None, state)
}

fn fire_event(
Expand Down Expand Up @@ -97,44 +102,32 @@ impl ScheduleWorker {
}

fn run(&mut self) {
let m = Mutex::new(());

// Unwrapping is safe because the mutex can't be poisoned,
// since we just created it.
let mut g = m.lock().unwrap();

let mut state = self.state.0.lock().unwrap();
loop {
let now = Instant::now();
let next_when = self.fire_due_events(now);
let (next_when, state_out) = self.fire_due_events(now, state);
state = state_out;

if self.stopped.load(atomic::Ordering::SeqCst) {
if state.is_stopped {
break;
}

// Unwrapping is safe because the mutex can't be poisoned,
// since we haven't shared it with another thread.
g = if let Some(next_when) = next_when {
state = if let Some(next_when) = next_when {
// wait for stop notification or timeout to send next event
self.stop_trigger
.wait_timeout(g, next_when - now)
.unwrap()
.0
self.state.1.wait_timeout(state, next_when - now).unwrap().0
} else {
// no pending events
// wait for new event, to check when it should be send and then wait to send it
self.new_event_trigger.wait(g).unwrap()
self.state.1.wait(state).unwrap()
};
}
}
}

pub struct WatchTimer {
counter: u64,
new_event_trigger: Arc<Condvar>,
stop_trigger: Arc<Condvar>,
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
}

impl WatchTimer {
Expand All @@ -143,64 +136,56 @@ impl WatchTimer {
operations_buffer: OperationsBuffer,
delay: Duration,
) -> WatchTimer {
let events = Arc::new(Mutex::new(VecDeque::new()));
let new_event_trigger = Arc::new(Condvar::new());
let stop_trigger = Arc::new(Condvar::new());
let stopped = Arc::new(AtomicBool::new(false));

let worker_new_event_trigger = new_event_trigger.clone();
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new()));

let worker_state = state.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
stop_trigger: worker_stop_trigger,
events: worker_events,
state: worker_state,
tx,
operations_buffer,
stopped: worker_stopped,
}
.run();
});

WatchTimer {
counter: 0,
new_event_trigger,
stop_trigger,
state,
delay,
events,
stopped,
}
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

self.events.lock().unwrap().push_back(ScheduledEvent {
id: self.counter,
when: Instant::now() + self.delay,
path,
});

self.new_event_trigger.notify_one();
{
let mut state = self.state.0.lock().unwrap();
state.events.push_back(ScheduledEvent {
id: self.counter,
when: Instant::now() + self.delay,
path,
});
}
self.state.1.notify_one();

self.counter
}

pub fn ignore(&self, id: u64) {
let mut events = self.events.lock().unwrap();
let index = events.iter().rposition(|e| e.id == id);
let mut state = self.state.0.lock().unwrap();
let index = state.events.iter().rposition(|e| e.id == id);
if let Some(index) = index {
events.remove(index);
state.events.remove(index);
}
}
}

impl Drop for WatchTimer {
fn drop(&mut self) {
self.stopped.store(true, atomic::Ordering::SeqCst);
self.stop_trigger.notify_one();
self.new_event_trigger.notify_one();
{
let mut state = self.state.0.lock().unwrap();
state.is_stopped = true;
}
self.state.1.notify_one();
}
}
4 changes: 3 additions & 1 deletion src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
let ch = handleapi::CloseHandle(ws.dir_handle);
// have to wait for it, otherwise we leak the memory allocated for there read request
if cio != 0 && ch != 0 {
synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE);
while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0
{
}
}
handleapi::CloseHandle(ws.complete_sem);
}
Expand Down

0 comments on commit 54079d9

Please sign in to comment.