diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index d9980f16dbb..17ee817b4a6 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -209,7 +209,12 @@ pub struct Behaviour { last_probe: Option, - pending_out_events: VecDeque<::OutEvent>, + pending_actions: VecDeque< + NetworkBehaviourAction< + ::OutEvent, + ::ConnectionHandler, + >, + >, probe_id: ProbeId, @@ -237,7 +242,7 @@ impl Behaviour { throttled_servers: Vec::new(), throttled_clients: Vec::new(), last_probe: None, - pending_out_events: VecDeque::new(), + pending_actions: VecDeque::new(), probe_id: ProbeId(0), listen_addresses: Default::default(), external_addresses: Default::default(), @@ -334,8 +339,10 @@ impl Behaviour { role_override: Endpoint::Dialer, } => { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { - self.pending_out_events - .push_back(Event::InboundProbe(event)); + self.pending_actions + .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + event, + ))); } } ConnectedPoint::Dialer { @@ -395,8 +402,10 @@ impl Behaviour { error, })); if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { - self.pending_out_events - .push_back(Event::InboundProbe(event)); + self.pending_actions + .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + event, + ))); } } @@ -431,14 +440,13 @@ impl NetworkBehaviour for Behaviour { fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll { loop { - if let Some(event) = self.pending_out_events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + if let Some(event) = self.pending_actions.pop_front() { + return Poll::Ready(event); } - let mut is_inner_pending = false; match self.inner.poll(cx, params) { Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { - let (mut events, action) = match event { + let actions = match event { request_response::Event::Message { message: request_response::Message::Response { .. }, .. @@ -453,24 +461,32 @@ impl NetworkBehaviour for Behaviour { | request_response::Event::InboundFailure { .. } => { self.as_server().handle_event(params, event) } - request_response::Event::ResponseSent { .. } => (VecDeque::new(), None), + request_response::Event::ResponseSent { .. } => VecDeque::new(), }; - self.pending_out_events.append(&mut events); - if let Some(action) = action { - return Poll::Ready(action); - } + + self.pending_actions.extend(actions); + continue; + } + Poll::Ready(action) => { + self.pending_actions + .push_back(action.map_out(|_| unreachable!())); + continue; } - Poll::Ready(action) => return Poll::Ready(action.map_out(|_| unreachable!())), - Poll::Pending => is_inner_pending = true, + Poll::Pending => {} } match self.as_client().poll_auto_probe(cx) { - Poll::Ready(event) => self - .pending_out_events - .push_back(Event::OutboundProbe(event)), - Poll::Pending if is_inner_pending => return Poll::Pending, + Poll::Ready(event) => { + self.pending_actions + .push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( + event, + ))); + continue; + } Poll::Pending => {} } + + return Poll::Pending; } } @@ -558,7 +574,7 @@ trait HandleInnerEvent { &mut self, params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option); + ) -> VecDeque; } trait GlobalIp { diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 76ddebce2dc..51527162326 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -109,9 +109,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { &mut self, params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option) { - let mut events = VecDeque::new(); - let mut action = None; + ) -> VecDeque { match event { request_response::Event::Message { peer, @@ -140,13 +138,20 @@ impl<'a> HandleInnerEvent for AsClient<'a> { error: OutboundProbeError::Response(e), }, }; - events.push_back(Event::OutboundProbe(event)); + + let mut actions = VecDeque::with_capacity(3); + + actions.push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( + event, + ))); if let Some(old) = self.handle_reported_status(response.result.clone().into()) { - events.push_back(Event::StatusChanged { - old, - new: self.nat_status.clone(), - }); + actions.push_back(NetworkBehaviourAction::GenerateEvent( + Event::StatusChanged { + old, + new: self.nat_status.clone(), + }, + )); } if let Ok(address) = response.result { @@ -158,12 +163,14 @@ impl<'a> HandleInnerEvent for AsClient<'a> { .find_map(|r| (r.addr == address).then_some(r.score)) .unwrap_or(AddressScore::Finite(0)); if let AddressScore::Finite(finite_score) = score { - action = Some(NetworkBehaviourAction::ReportObservedAddr { + actions.push_back(NetworkBehaviourAction::ReportObservedAddr { address, score: AddressScore::Finite(finite_score + 1), }); } } + + actions } request_response::Event::OutboundFailure { peer, @@ -180,17 +187,18 @@ impl<'a> HandleInnerEvent for AsClient<'a> { .remove(&request_id) .unwrap_or_else(|| self.probe_id.next()); - events.push_back(Event::OutboundProbe(OutboundProbeEvent::Error { - probe_id, - peer: Some(peer), - error: OutboundProbeError::OutboundRequest(error), - })); - self.schedule_probe.reset(Duration::ZERO); + + VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( + OutboundProbeEvent::Error { + probe_id, + peer: Some(peer), + error: OutboundProbeError::OutboundRequest(error), + }, + ))]) } - _ => {} + _ => VecDeque::default(), } - (events, action) } } diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 455ac3d16b3..8d08f7ed8d0 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -98,9 +98,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { &mut self, _params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option) { - let mut events = VecDeque::new(); - let mut action = None; + ) -> VecDeque { match event { request_response::Event::Message { peer, @@ -124,20 +122,25 @@ impl<'a> HandleInnerEvent for AsServer<'a> { .insert(peer, (probe_id, request_id, addrs.clone(), channel)); self.throttled_clients.push((peer, Instant::now())); - events.push_back(Event::InboundProbe(InboundProbeEvent::Request { - probe_id, - peer, - addresses: addrs.clone(), - })); - - action = Some(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(peer) - .condition(PeerCondition::Always) - .override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0")) - .addresses(addrs) - .build(), - handler: self.inner.new_handler(), - }); + VecDeque::from([ + NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + InboundProbeEvent::Request { + probe_id, + peer, + addresses: addrs.clone(), + }, + )), + NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(peer) + .condition(PeerCondition::Always) + .override_dial_concurrency_factor( + NonZeroU8::new(1).expect("1 > 0"), + ) + .addresses(addrs) + .build(), + handler: self.inner.new_handler(), + }, + ]) } Err((status_text, error)) => { log::debug!( @@ -152,11 +155,13 @@ impl<'a> HandleInnerEvent for AsServer<'a> { }; let _ = self.inner.send_response(channel, response); - events.push_back(Event::InboundProbe(InboundProbeEvent::Error { - probe_id, - peer, - error: InboundProbeError::Response(error), - })); + VecDeque::from([NetworkBehaviourAction::GenerateEvent( + Event::InboundProbe(InboundProbeEvent::Error { + probe_id, + peer, + error: InboundProbeError::Response(error), + }), + )]) } } } @@ -178,15 +183,16 @@ impl<'a> HandleInnerEvent for AsServer<'a> { _ => self.probe_id.next(), }; - events.push_back(Event::InboundProbe(InboundProbeEvent::Error { - probe_id, - peer, - error: InboundProbeError::InboundRequest(error), - })); + VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + InboundProbeEvent::Error { + probe_id, + peer, + error: InboundProbeError::InboundRequest(error), + }, + ))]) } - _ => {} + _ => VecDeque::new(), } - (events, action) } }