From 8ff030a16a62104bcbc8c080b27d5ef5de6b6456 Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 14:38:38 -0700 Subject: [PATCH 01/10] Fix ScheduleTimer thread leak. See issue #287 on notify --- src/debounce/timer.rs | 125 +++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 70 deletions(-) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 802ad49a..e3a5a919 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,15 +1,12 @@ use super::super::{op, DebouncedEvent}; -use std::collections::VecDeque; -use std::ops::DerefMut; use std::path::PathBuf; use std::sync::mpsc; -use std::sync::{ - atomic::{self, AtomicBool}, - Arc, Condvar, Mutex, -}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; +use std::{collections::VecDeque, sync::MutexGuard}; +use std::{ops::DerefMut, thread::JoinHandle}; use debounce::{OperationsBuffer, OperationsBufferInner}; @@ -20,45 +17,53 @@ struct ScheduledEvent { path: PathBuf, } +#[derive(Default)] +struct WorkerSharedState { + is_stopped: bool, + events: VecDeque, +} + struct ScheduleWorker { - new_event_trigger: Arc, - stop_trigger: Arc, - events: Arc>>, + state: Arc<(Mutex, Condvar)>, tx: mpsc::Sender, operations_buffer: OperationsBuffer, - stopped: Arc, } impl ScheduleWorker { - fn fire_due_events(&self, now: Instant) -> Option { + fn fire_due_events<'a>( + &'a self, + now: Instant, + state: MutexGuard<'a, WorkerSharedState>, + ) -> (Option, MutexGuard<'a, WorkerSharedState>) { // simple deadlock avoidance loop. - let (mut events, mut op_buf) = loop { - let events = self.events.lock().unwrap(); + let mut state = Some(state); + let (mut state, mut op_buf) = loop { + let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap()); // To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the // events Mutex, and retry after yielding. match self.operations_buffer.try_lock() { - Ok(op_buf) => break (events, op_buf), - Err(::std::sync::TryLockError::Poisoned { .. }) => return None, + Ok(op_buf) => break (state, op_buf), + Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state), Err(::std::sync::TryLockError::WouldBlock) => { // drop the lock before yielding to give other threads a chance to complete // their work. - drop(events); + drop(state); ::std::thread::yield_now(); } } }; - while let Some(event) = events.pop_front() { + while let Some(event) = state.events.pop_front() { if event.when <= now { self.fire_event(event, &mut op_buf) } else { // not due yet, put it back let next_when = event.when; - events.push_front(event); - return Some(next_when); + state.events.push_front(event); + return (Some(next_when), state); } } - None + (None, state) } fn fire_event( @@ -97,32 +102,23 @@ impl ScheduleWorker { } fn run(&mut self) { - let m = Mutex::new(()); - - // Unwrapping is safe because the mutex can't be poisoned, - // since we just created it. - let mut g = m.lock().unwrap(); - + let mut state = self.state.0.lock().unwrap(); loop { let now = Instant::now(); - let next_when = self.fire_due_events(now); + let (next_when, state_out) = self.fire_due_events(now, state); + state = state_out; - if self.stopped.load(atomic::Ordering::SeqCst) { + if state.is_stopped { break; } - // Unwrapping is safe because the mutex can't be poisoned, - // since we haven't shared it with another thread. - g = if let Some(next_when) = next_when { + state = if let Some(next_when) = next_when { // wait for stop notification or timeout to send next event - self.stop_trigger - .wait_timeout(g, next_when - now) - .unwrap() - .0 + self.state.1.wait_timeout(state, next_when - now).unwrap().0 } else { // no pending events // wait for new event, to check when it should be send and then wait to send it - self.new_event_trigger.wait(g).unwrap() + self.state.1.wait(state).unwrap() }; } } @@ -130,11 +126,8 @@ impl ScheduleWorker { pub struct WatchTimer { counter: u64, - new_event_trigger: Arc, - stop_trigger: Arc, + state: Arc<(Mutex, Condvar)>, delay: Duration, - events: Arc>>, - stopped: Arc, } impl WatchTimer { @@ -143,64 +136,56 @@ impl WatchTimer { operations_buffer: OperationsBuffer, delay: Duration, ) -> WatchTimer { - let events = Arc::new(Mutex::new(VecDeque::new())); - let new_event_trigger = Arc::new(Condvar::new()); - let stop_trigger = Arc::new(Condvar::new()); - let stopped = Arc::new(AtomicBool::new(false)); - - let worker_new_event_trigger = new_event_trigger.clone(); - let worker_stop_trigger = stop_trigger.clone(); - let worker_events = events.clone(); - let worker_stopped = stopped.clone(); + let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new())); + + let worker_state = state.clone(); thread::spawn(move || { ScheduleWorker { - new_event_trigger: worker_new_event_trigger, - stop_trigger: worker_stop_trigger, - events: worker_events, + state: worker_state, tx, operations_buffer, - stopped: worker_stopped, } .run(); }); WatchTimer { counter: 0, - new_event_trigger, - stop_trigger, + state, delay, - events, - stopped, } } pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); - self.events.lock().unwrap().push_back(ScheduledEvent { - id: self.counter, - when: Instant::now() + self.delay, - path, - }); - - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.events.push_back(ScheduledEvent { + id: self.counter, + when: Instant::now() + self.delay, + path, + }); + } + self.state.1.notify_one(); self.counter } pub fn ignore(&self, id: u64) { - let mut events = self.events.lock().unwrap(); - let index = events.iter().rposition(|e| e.id == id); + let mut state = self.state.0.lock().unwrap(); + let index = state.events.iter().rposition(|e| e.id == id); if let Some(index) = index { - events.remove(index); + state.events.remove(index); } } } impl Drop for WatchTimer { fn drop(&mut self) { - self.stopped.store(true, atomic::Ordering::SeqCst); - self.stop_trigger.notify_one(); - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.is_stopped = true; + } + self.state.1.notify_one(); } } From 361f8a665a6a40b2439448d1fca2ed1956ffc1fe Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 14:40:43 -0700 Subject: [PATCH 02/10] Add a stop-gap fix for ReadDirectoryRequest leak from APC queue not being consumed all the way --- src/windows.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 78245cea..753a5adb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -19,7 +19,6 @@ use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; -use std::collections::HashMap; use std::env; use std::ffi::OsString; use std::mem; @@ -32,6 +31,10 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; +use std::{ + borrow::{Borrow, BorrowMut}, + collections::HashMap, +}; const BUF_SIZE: u32 = 16384; @@ -68,6 +71,10 @@ struct WatchState { complete_sem: HANDLE, } +thread_local! { + pub static APC_QUEUE_SIZE: std::cell::RefCell = std::cell::RefCell::new(0); +} + struct ReadDirectoryChangesServer { rx: Receiver, event_tx: Arc>, @@ -138,6 +145,12 @@ impl ReadDirectoryChangesServer { } // we have to clean this up, since the watcher may be long gone + while APC_QUEUE_SIZE.with(|x| *x.borrow()) > 0 { + // There is somehow a leak in the APC queue - let the completion routines fire until the APC queue is empty. + unsafe { + synchapi::WaitForSingleObjectEx(self.wakeup_sem, INFINITE, TRUE); + } + } unsafe { handleapi::CloseHandle(self.wakeup_sem); } @@ -294,6 +307,7 @@ fn start_read(rd: &ReadData, event_tx: Arc>, handle: HANDLE) { synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); } else { // read ok. forget overlapped to let the completion routine handle memory + APC_QUEUE_SIZE.with(|x| *x.borrow_mut() += 1); mem::forget(overlapped); } } @@ -314,6 +328,8 @@ unsafe extern "system" fn handle_event( _bytes_written: u32, overlapped: LPOVERLAPPED, ) { + APC_QUEUE_SIZE.with(|x| *x.borrow_mut() -= 1); + let overlapped: Box = Box::from_raw(overlapped); let request: Box = Box::from_raw(overlapped.hEvent as *mut _); From 680fc7e2a55dd5f519a6dd617d49f82dead441bf Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 17:02:37 -0700 Subject: [PATCH 03/10] Make stop_watch really wait for the WatchState's completion semaphore to signal --- src/windows.rs | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 753a5adb..cb02c275 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -19,6 +19,7 @@ use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use std::collections::HashMap; use std::env; use std::ffi::OsString; use std::mem; @@ -31,10 +32,6 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use std::{ - borrow::{Borrow, BorrowMut}, - collections::HashMap, -}; const BUF_SIZE: u32 = 16384; @@ -71,10 +68,6 @@ struct WatchState { complete_sem: HANDLE, } -thread_local! { - pub static APC_QUEUE_SIZE: std::cell::RefCell = std::cell::RefCell::new(0); -} - struct ReadDirectoryChangesServer { rx: Receiver, event_tx: Arc>, @@ -145,12 +138,6 @@ impl ReadDirectoryChangesServer { } // we have to clean this up, since the watcher may be long gone - while APC_QUEUE_SIZE.with(|x| *x.borrow()) > 0 { - // There is somehow a leak in the APC queue - let the completion routines fire until the APC queue is empty. - unsafe { - synchapi::WaitForSingleObjectEx(self.wakeup_sem, INFINITE, TRUE); - } - } unsafe { handleapi::CloseHandle(self.wakeup_sem); } @@ -248,7 +235,9 @@ fn stop_watch(ws: &WatchState, meta_tx: &Sender) { let ch = handleapi::CloseHandle(ws.dir_handle); // have to wait for it, otherwise we leak the memory allocated for there read request if cio != 0 && ch != 0 { - synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); + while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0 + { + } } handleapi::CloseHandle(ws.complete_sem); } @@ -307,7 +296,6 @@ fn start_read(rd: &ReadData, event_tx: Arc>, handle: HANDLE) { synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); } else { // read ok. forget overlapped to let the completion routine handle memory - APC_QUEUE_SIZE.with(|x| *x.borrow_mut() += 1); mem::forget(overlapped); } } @@ -328,8 +316,6 @@ unsafe extern "system" fn handle_event( _bytes_written: u32, overlapped: LPOVERLAPPED, ) { - APC_QUEUE_SIZE.with(|x| *x.borrow_mut() -= 1); - let overlapped: Box = Box::from_raw(overlapped); let request: Box = Box::from_raw(overlapped.hEvent as *mut _); From 4f52bc39b500143438fb2786a88deee040851a91 Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 17:34:11 -0700 Subject: [PATCH 04/10] Remove unused import --- src/debounce/timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index e3a5a919..90ea85fb 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,12 +1,12 @@ use super::super::{op, DebouncedEvent}; +use std::ops::DerefMut; use std::path::PathBuf; use std::sync::mpsc; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{collections::VecDeque, sync::MutexGuard}; -use std::{ops::DerefMut, thread::JoinHandle}; use debounce::{OperationsBuffer, OperationsBufferInner}; From 00b33f9bc8603d700434779cd5f5969efb1c68f2 Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 14:38:38 -0700 Subject: [PATCH 05/10] Fix ScheduleTimer thread leak. See issue #287 on notify --- src/debounce/timer.rs | 125 +++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 70 deletions(-) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 802ad49a..e3a5a919 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,15 +1,12 @@ use super::super::{op, DebouncedEvent}; -use std::collections::VecDeque; -use std::ops::DerefMut; use std::path::PathBuf; use std::sync::mpsc; -use std::sync::{ - atomic::{self, AtomicBool}, - Arc, Condvar, Mutex, -}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; +use std::{collections::VecDeque, sync::MutexGuard}; +use std::{ops::DerefMut, thread::JoinHandle}; use debounce::{OperationsBuffer, OperationsBufferInner}; @@ -20,45 +17,53 @@ struct ScheduledEvent { path: PathBuf, } +#[derive(Default)] +struct WorkerSharedState { + is_stopped: bool, + events: VecDeque, +} + struct ScheduleWorker { - new_event_trigger: Arc, - stop_trigger: Arc, - events: Arc>>, + state: Arc<(Mutex, Condvar)>, tx: mpsc::Sender, operations_buffer: OperationsBuffer, - stopped: Arc, } impl ScheduleWorker { - fn fire_due_events(&self, now: Instant) -> Option { + fn fire_due_events<'a>( + &'a self, + now: Instant, + state: MutexGuard<'a, WorkerSharedState>, + ) -> (Option, MutexGuard<'a, WorkerSharedState>) { // simple deadlock avoidance loop. - let (mut events, mut op_buf) = loop { - let events = self.events.lock().unwrap(); + let mut state = Some(state); + let (mut state, mut op_buf) = loop { + let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap()); // To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the // events Mutex, and retry after yielding. match self.operations_buffer.try_lock() { - Ok(op_buf) => break (events, op_buf), - Err(::std::sync::TryLockError::Poisoned { .. }) => return None, + Ok(op_buf) => break (state, op_buf), + Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state), Err(::std::sync::TryLockError::WouldBlock) => { // drop the lock before yielding to give other threads a chance to complete // their work. - drop(events); + drop(state); ::std::thread::yield_now(); } } }; - while let Some(event) = events.pop_front() { + while let Some(event) = state.events.pop_front() { if event.when <= now { self.fire_event(event, &mut op_buf) } else { // not due yet, put it back let next_when = event.when; - events.push_front(event); - return Some(next_when); + state.events.push_front(event); + return (Some(next_when), state); } } - None + (None, state) } fn fire_event( @@ -97,32 +102,23 @@ impl ScheduleWorker { } fn run(&mut self) { - let m = Mutex::new(()); - - // Unwrapping is safe because the mutex can't be poisoned, - // since we just created it. - let mut g = m.lock().unwrap(); - + let mut state = self.state.0.lock().unwrap(); loop { let now = Instant::now(); - let next_when = self.fire_due_events(now); + let (next_when, state_out) = self.fire_due_events(now, state); + state = state_out; - if self.stopped.load(atomic::Ordering::SeqCst) { + if state.is_stopped { break; } - // Unwrapping is safe because the mutex can't be poisoned, - // since we haven't shared it with another thread. - g = if let Some(next_when) = next_when { + state = if let Some(next_when) = next_when { // wait for stop notification or timeout to send next event - self.stop_trigger - .wait_timeout(g, next_when - now) - .unwrap() - .0 + self.state.1.wait_timeout(state, next_when - now).unwrap().0 } else { // no pending events // wait for new event, to check when it should be send and then wait to send it - self.new_event_trigger.wait(g).unwrap() + self.state.1.wait(state).unwrap() }; } } @@ -130,11 +126,8 @@ impl ScheduleWorker { pub struct WatchTimer { counter: u64, - new_event_trigger: Arc, - stop_trigger: Arc, + state: Arc<(Mutex, Condvar)>, delay: Duration, - events: Arc>>, - stopped: Arc, } impl WatchTimer { @@ -143,64 +136,56 @@ impl WatchTimer { operations_buffer: OperationsBuffer, delay: Duration, ) -> WatchTimer { - let events = Arc::new(Mutex::new(VecDeque::new())); - let new_event_trigger = Arc::new(Condvar::new()); - let stop_trigger = Arc::new(Condvar::new()); - let stopped = Arc::new(AtomicBool::new(false)); - - let worker_new_event_trigger = new_event_trigger.clone(); - let worker_stop_trigger = stop_trigger.clone(); - let worker_events = events.clone(); - let worker_stopped = stopped.clone(); + let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new())); + + let worker_state = state.clone(); thread::spawn(move || { ScheduleWorker { - new_event_trigger: worker_new_event_trigger, - stop_trigger: worker_stop_trigger, - events: worker_events, + state: worker_state, tx, operations_buffer, - stopped: worker_stopped, } .run(); }); WatchTimer { counter: 0, - new_event_trigger, - stop_trigger, + state, delay, - events, - stopped, } } pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); - self.events.lock().unwrap().push_back(ScheduledEvent { - id: self.counter, - when: Instant::now() + self.delay, - path, - }); - - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.events.push_back(ScheduledEvent { + id: self.counter, + when: Instant::now() + self.delay, + path, + }); + } + self.state.1.notify_one(); self.counter } pub fn ignore(&self, id: u64) { - let mut events = self.events.lock().unwrap(); - let index = events.iter().rposition(|e| e.id == id); + let mut state = self.state.0.lock().unwrap(); + let index = state.events.iter().rposition(|e| e.id == id); if let Some(index) = index { - events.remove(index); + state.events.remove(index); } } } impl Drop for WatchTimer { fn drop(&mut self) { - self.stopped.store(true, atomic::Ordering::SeqCst); - self.stop_trigger.notify_one(); - self.new_event_trigger.notify_one(); + { + let mut state = self.state.0.lock().unwrap(); + state.is_stopped = true; + } + self.state.1.notify_one(); } } From e0fc7a2f8f4d63ecbe55605ad78aeb694f24295f Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 14:40:43 -0700 Subject: [PATCH 06/10] Add a stop-gap fix for ReadDirectoryRequest leak from APC queue not being consumed all the way --- src/windows.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 78245cea..753a5adb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -19,7 +19,6 @@ use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; -use std::collections::HashMap; use std::env; use std::ffi::OsString; use std::mem; @@ -32,6 +31,10 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; +use std::{ + borrow::{Borrow, BorrowMut}, + collections::HashMap, +}; const BUF_SIZE: u32 = 16384; @@ -68,6 +71,10 @@ struct WatchState { complete_sem: HANDLE, } +thread_local! { + pub static APC_QUEUE_SIZE: std::cell::RefCell = std::cell::RefCell::new(0); +} + struct ReadDirectoryChangesServer { rx: Receiver, event_tx: Arc>, @@ -138,6 +145,12 @@ impl ReadDirectoryChangesServer { } // we have to clean this up, since the watcher may be long gone + while APC_QUEUE_SIZE.with(|x| *x.borrow()) > 0 { + // There is somehow a leak in the APC queue - let the completion routines fire until the APC queue is empty. + unsafe { + synchapi::WaitForSingleObjectEx(self.wakeup_sem, INFINITE, TRUE); + } + } unsafe { handleapi::CloseHandle(self.wakeup_sem); } @@ -294,6 +307,7 @@ fn start_read(rd: &ReadData, event_tx: Arc>, handle: HANDLE) { synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); } else { // read ok. forget overlapped to let the completion routine handle memory + APC_QUEUE_SIZE.with(|x| *x.borrow_mut() += 1); mem::forget(overlapped); } } @@ -314,6 +328,8 @@ unsafe extern "system" fn handle_event( _bytes_written: u32, overlapped: LPOVERLAPPED, ) { + APC_QUEUE_SIZE.with(|x| *x.borrow_mut() -= 1); + let overlapped: Box = Box::from_raw(overlapped); let request: Box = Box::from_raw(overlapped.hEvent as *mut _); From 5dc245a12ab559e8761e4809e95bce5922a4658e Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 17:02:37 -0700 Subject: [PATCH 07/10] Make stop_watch really wait for the WatchState's completion semaphore to signal --- src/windows.rs | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 753a5adb..cb02c275 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -19,6 +19,7 @@ use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use std::collections::HashMap; use std::env; use std::ffi::OsString; use std::mem; @@ -31,10 +32,6 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use std::{ - borrow::{Borrow, BorrowMut}, - collections::HashMap, -}; const BUF_SIZE: u32 = 16384; @@ -71,10 +68,6 @@ struct WatchState { complete_sem: HANDLE, } -thread_local! { - pub static APC_QUEUE_SIZE: std::cell::RefCell = std::cell::RefCell::new(0); -} - struct ReadDirectoryChangesServer { rx: Receiver, event_tx: Arc>, @@ -145,12 +138,6 @@ impl ReadDirectoryChangesServer { } // we have to clean this up, since the watcher may be long gone - while APC_QUEUE_SIZE.with(|x| *x.borrow()) > 0 { - // There is somehow a leak in the APC queue - let the completion routines fire until the APC queue is empty. - unsafe { - synchapi::WaitForSingleObjectEx(self.wakeup_sem, INFINITE, TRUE); - } - } unsafe { handleapi::CloseHandle(self.wakeup_sem); } @@ -248,7 +235,9 @@ fn stop_watch(ws: &WatchState, meta_tx: &Sender) { let ch = handleapi::CloseHandle(ws.dir_handle); // have to wait for it, otherwise we leak the memory allocated for there read request if cio != 0 && ch != 0 { - synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); + while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0 + { + } } handleapi::CloseHandle(ws.complete_sem); } @@ -307,7 +296,6 @@ fn start_read(rd: &ReadData, event_tx: Arc>, handle: HANDLE) { synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); } else { // read ok. forget overlapped to let the completion routine handle memory - APC_QUEUE_SIZE.with(|x| *x.borrow_mut() += 1); mem::forget(overlapped); } } @@ -328,8 +316,6 @@ unsafe extern "system" fn handle_event( _bytes_written: u32, overlapped: LPOVERLAPPED, ) { - APC_QUEUE_SIZE.with(|x| *x.borrow_mut() -= 1); - let overlapped: Box = Box::from_raw(overlapped); let request: Box = Box::from_raw(overlapped.hEvent as *mut _); From 7e7d0590826775254143f2c88775d33c4b9e086a Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 17 Mar 2021 17:34:11 -0700 Subject: [PATCH 08/10] Remove unused import --- src/debounce/timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index e3a5a919..90ea85fb 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,12 +1,12 @@ use super::super::{op, DebouncedEvent}; +use std::ops::DerefMut; use std::path::PathBuf; use std::sync::mpsc; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{collections::VecDeque, sync::MutexGuard}; -use std::{ops::DerefMut, thread::JoinHandle}; use debounce::{OperationsBuffer, OperationsBufferInner}; From 4084cb21a7c9147034519b6168cd1c6558e1d0ed Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Thu, 18 Mar 2021 12:36:05 -0700 Subject: [PATCH 09/10] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68c005f4..8b4fe7f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 4.0.16 (future) - FIX: Report events promptly on Linux, even when many occur in rapid succession. [#268] +- FIX: Fix leaks on Windows and debounce module. [#287] [#268]: https://github.com/notify-rs/notify/pull/268 From 1bba5051f131b7ab2f692c4a3fe97f7bc09075c7 Mon Sep 17 00:00:00 2001 From: Anthony Ha Date: Wed, 24 Mar 2021 22:23:05 -0700 Subject: [PATCH 10/10] Update CHANGELOG.md Co-authored-by: Yuki Okushi --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b4fe7f8..659d046d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## 4.0.16 (future) - FIX: Report events promptly on Linux, even when many occur in rapid succession. [#268] -- FIX: Fix leaks on Windows and debounce module. [#287] +- FIX: Fix leaks on Windows and debounce module. [#288] [#268]: https://github.com/notify-rs/notify/pull/268