Skip to content

Commit

Permalink
Finish windows impl.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Nov 11, 2020
1 parent 736ce84 commit e253d0c
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1,336 deletions.
13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,16 @@ keywords = ["asynchronous", "routing"]
[dependencies]
libc = "0.2.66"

[target.'cfg(unix)'.dependencies]
async-io = "1.2.0"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.8", features = ["netioapi", "ntdef", "ws2def"] }
futures-lite = "1.11.2"
if-addrs = "0.6.5"
winapi = { version = "0.3.8", features = ["netioapi", "ntdef", "winerror", "ws2def"] }

[target.'cfg(windows)'.dev-dependencies]
winapi = { version = "0.3.8", features = ["processthreadsapi", "synchapi"] }

[dev-dependencies]
futures-lite = "1.11.2"
31 changes: 0 additions & 31 deletions bindgen.sh

This file was deleted.

121 changes: 86 additions & 35 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,117 @@
#[cfg(not(windows))]
compile_error!("this module only supports Windows!");

#[allow(
nonstandard_style,
trivial_numeric_casts,
trivial_casts,
unsafe_code,
unused,
unreachable_pub,
missing_docs
)]
mod bindings;
use bindings::*;
use crate::Event;
use futures_lite::future::poll_fn;
use std::{collections::{HashSet, VecDeque},
net::IpAddr,
sync::{atomic::{AtomicBool, Ordering},
Arc},
task::{Poll, Waker}};
use winapi::shared::{netioapi::{CancelMibChangeNotify2, NotifyIpInterfaceChange,
MIB_IPINTERFACE_ROW, MIB_NOTIFICATION_TYPE},
ntdef::{HANDLE, PVOID},
winerror::NO_ERROR,
ws2def::AF_UNSPEC};

#[link(name = "iphlpapi")]
extern "C" {}
/// An address set/watcher
#[derive(Debug)]
pub struct AddrSet {
addrs: HashSet<IpAddr>,
queue: VecDeque<Event>,
waker: Option<Waker>,
notif: RouteChangeNotification,
resync: Arc<AtomicBool>,
}

impl AddrSet {
/// Create a watcher
pub async fn new() -> std::io::Result<Self> {
let resync = Arc::new(AtomicBool::new(true));
Ok(Self {
addrs: Default::default(),
queue: Default::default(),
waker: Default::default(),
resync: resync.clone(),
notif: RouteChangeNotification::new(Box::new(move |_, _| {
resync.store(true, Ordering::SeqCst);
}))?,
})
}

fn resync(&mut self) -> std::io::Result<()> {
let addrs = if_addrs::get_if_addrs()?;
for old_addr in self.addrs.clone() {
if addrs.iter().find(|addr| addr.ip() == old_addr).is_none() {
self.addrs.remove(&old_addr);
self.queue.push_back(Event::Delete(old_addr));
}
}
for new_addr in addrs {
let ip = new_addr.ip();
if self.addrs.insert(ip) {
self.queue.push_back(Event::New(ip));
}
}
if let Some(waker) = self.waker.take() {
waker.wake();
}
Ok(())
}

/// Returns a future for the next event.
pub async fn next(&mut self) -> std::io::Result<Event> {
poll_fn(|cx| {
self.waker = Some(cx.waker().clone());
if self.resync.load(Ordering::SeqCst) {
if let Err(error) = self.resync() {
return Poll::Ready(Err(error))
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
})
.await
}
}

/// Route change notifications
#[allow(missing_debug_implementations)]
pub struct RouteChangeNotification {
#[derive(Debug)]
struct RouteChangeNotification {
handle: HANDLE,
callback: *mut RouteChangeCallback,
// actual callback follows
}

#[cfg(any())]
fn align_to<T, U>() -> Option<(Layout, usize)> {
let layout_t = Layout::new::<T>();
let layout_u = Layout::new::<U>();
assert!(layout_t.size() >= layout_t.align());
let align = std::cmp::max(layout_t.size(), layout_u.align());
if (usize::MAX >> 1).checked_sub(layout_u.size())? < align {
None
} else {
unsafe { Layout::from_size_align_unchecked(align + layout_u.size(), align) }
}
}

/// The type of route change callbacks
pub type RouteChangeCallback = Box<dyn FnMut(&MIB_IPFORWARD_ROW2, MIB_NOTIFICATION_TYPE) + Send>;
type RouteChangeCallback = Box<dyn FnMut(&MIB_IPINTERFACE_ROW, MIB_NOTIFICATION_TYPE) + Send>;
impl RouteChangeNotification {
/// Register for route change notifications
pub fn new(cb: RouteChangeCallback) -> Result<Self, ()> {
fn new(cb: RouteChangeCallback) -> std::io::Result<Self> {
#[allow(non_snake_case)]
unsafe extern "stdcall" fn global_callback(
unsafe extern "system" fn global_callback(
CallerContext: PVOID,
Row: PMIB_IPFORWARD_ROW2,
Row: *mut MIB_IPINTERFACE_ROW,
NotificationType: MIB_NOTIFICATION_TYPE,
) {
(**(CallerContext as *mut RouteChangeCallback))(&*Row, NotificationType)
}
let mut handle = core::ptr::null_mut();
let callback = Box::into_raw(Box::new(cb));
if unsafe {
NotifyRouteChange2(
NotifyIpInterfaceChange(
AF_UNSPEC as _,
Some(global_callback),
callback as _,
0,
&mut handle,
)
} != NO_ERROR as _
} != NO_ERROR
{
Err(())
Err(std::io::Error::last_os_error())
} else {
Ok(Self { callback, handle })
}
Expand All @@ -82,6 +131,8 @@ impl Drop for RouteChangeNotification {
#[cfg(test)]
mod tests {
use super::*;
use winapi::{shared::minwindef::DWORD,
um::{processthreadsapi::GetCurrentThreadId, synchapi::SleepEx}};

fn get_current_thread_id() -> DWORD { unsafe { GetCurrentThreadId() } }
#[test]
Expand Down
Loading

0 comments on commit e253d0c

Please sign in to comment.