Skip to content

Commit

Permalink
Make unix implementation async.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Nov 11, 2020
1 parent 565610a commit 736ce84
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 159 deletions.
27 changes: 19 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! limited to synchronous operation.
#![deny(
exceeding_bitshifts,
arithmetic_overflow,
invalid_type_param_default,
missing_fragment_specifier,
mutable_transmutes,
Expand Down Expand Up @@ -72,15 +72,26 @@ pub enum Event {
Delete(std::net::IpAddr),
}

#[cfg(all(test, not(windows)))]
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future::poll_fn;
use std::{future::Future, pin::Pin, task::Poll};

#[test]
fn it_works() {
let set = AddrSet::new();
println!("Got event {:?}", set);
for i in set.unwrap() {
println!("Got event {:?}", i.unwrap())
}
fn test_ip_watch() {
futures_lite::future::block_on(async {
let mut set = AddrSet::new().await.unwrap();
poll_fn(|cx| loop {
let next = set.next();
futures_lite::pin!(next);
if let Poll::Ready(Ok(ev)) = Pin::new(&mut next).poll(cx) {
println!("Got event {:?}", ev);
continue
}
return Poll::Ready(())
})
.await;
});
}
}
21 changes: 9 additions & 12 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,48 +44,45 @@ pub struct AddrSet {

impl AddrSet {
/// Create a watcher
pub fn new() -> std::io::Result<Self> {
pub async fn new() -> std::io::Result<Self> {
let mut hash = HashSet::new();
let mut watcher = Watcher::new()?;
let mut buf = Vec::with_capacity(1 << 16);
let mut queue = VecDeque::new();
watcher.resync(&mut buf, &mut queue, &mut hash)?;
watcher.resync(&mut buf, &mut queue, &mut hash).await?;
Ok(Self {
hash,
watcher,
buf,
queue,
})
}
}

impl Iterator for AddrSet {
type Item = std::io::Result<Event>;

fn next(&mut self) -> Option<Self::Item> {
/// Returns a future for the next event.
pub async fn next(&mut self) -> std::io::Result<Event> {
let Self {
watcher,
buf,
hash,
queue,
} = self;
if let Some(event) = queue.pop_front() {
return Some(Ok(event))
return Ok(event)
}
loop {
match watcher.next(buf, queue, hash) {
Status::IO(e) => return Some(Err(e)),
match watcher.next(buf, queue, hash).await {
Status::IO(e) => return Err(e),
Status::Desync => {
if buf.capacity() < 1 << 19 {
buf.reserve(buf.capacity() * 2);
}
if watcher.resync(buf, queue, hash).is_err() {
if watcher.resync(buf, queue, hash).await.is_err() {
continue
}
}
Status::Data(()) => {
if let Some(event) = queue.pop_front() {
return Some(Ok(event))
return Ok(event)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Fd {
#[cfg(target_os = "linux")]
let fd = unsafe { socket(libc::PF_NETLINK, FLAGS, libc::NETLINK_ROUTE) };
#[cfg(not(target_os = "linux"))]
let fd = unsafe { socket(libc::PF_ROUTE, FLAGS | libc::SOCK_NONBLOCK, libc::AF_UNSPEC) };
let fd = unsafe { socket(libc::PF_ROUTE, FLAGS, libc::AF_UNSPEC) };
if fd < 0 {
Err(std::io::Error::last_os_error())
} else {
Expand Down
Loading

0 comments on commit 736ce84

Please sign in to comment.