Skip to content

Commit

Permalink
*: return queued events before polling for new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Aug 5, 2022
1 parent 466bd81 commit 9082c0e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 26 deletions.
13 changes: 7 additions & 6 deletions src/apple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ impl IfWatcher {
}

pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
while let Poll::Ready(_) = Pin::new(&mut self.rx).poll_next(cx) {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.rx).poll_next(cx).is_pending() {
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down
27 changes: 14 additions & 13 deletions src/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net};
use fnv::FnvHashSet;
use futures::channel::mpsc::UnboundedReceiver;
use futures::future::Either;
use futures::ready;
use futures::stream::{Stream, TryStreamExt};
use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR};
use rtnetlink::packet::address::nlas::Nla;
Expand All @@ -12,7 +13,6 @@ use std::collections::VecDeque;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -95,14 +95,15 @@ impl IfWatcher {
}

pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
log::trace!("polling IfWatcher {:p}", self.deref_mut());
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(std::io::Error::new(
ErrorKind::BrokenPipe,
"rtnetlink socket closed",
)));
}
while let Poll::Ready(Some((message, _))) = Pin::new(&mut self.messages).poll_next(cx) {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(socket_err()));
}
let (message, _) =
ready!(Pin::new(&mut self.messages).poll_next(cx)).ok_or_else(socket_err)?;
match message.payload {
NetlinkPayload::Error(err) => return Poll::Ready(Err(err.to_io())),
NetlinkPayload::InnerMessage(msg) => match msg {
Expand All @@ -113,13 +114,13 @@ impl IfWatcher {
_ => {}
}
}
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
Poll::Pending
}
}

fn socket_err() -> std::io::Error {
std::io::Error::new(ErrorKind::BrokenPipe, "rtnetlink socket closed")
}

fn iter_nets(msg: AddressMessage) -> impl Iterator<Item = IpNet> {
let prefix = msg.header.prefix_len;
let family = msg.header.family;
Expand Down
15 changes: 8 additions & 7 deletions src/win.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,18 @@ impl IfWatcher {
}

pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
self.waker.register(cx.waker());
if self.resync.swap(false, Ordering::Relaxed) {
loop {
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
}
if !self.resync.swap(false, Ordering::Relaxed) {
self.waker.register(cx.waker());
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down

0 comments on commit 9082c0e

Please sign in to comment.