Skip to content

Commit

Permalink
feat(swarm): rename Custom variant to NotifyBehaviour
Browse files Browse the repository at this point in the history
Rename `ConnectionHandlerEvent::Custom` to `ConnectionHandlerEvent::NotifyBehaviour`.

Related #3848.

Pull-Request: #3955.
  • Loading branch information
tcoratger authored May 16, 2023
1 parent fbd471c commit 9f3c851
Show file tree
Hide file tree
Showing 18 changed files with 221 additions and 175 deletions.
2 changes: 1 addition & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ConnectionHandler for Handler {
> {
if !self.reported {
self.reported = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::DirectConnectionEstablished,
));
}
Expand Down
48 changes: 26 additions & 22 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ impl Handler {
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
};
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::InboundConnectRequest {
inbound_connect: Box::new(inbound_connect),
remote_addr,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectRequest {
inbound_connect: Box::new(inbound_connect),
remote_addr,
},
));
}
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
future::Either::Right(output) => void::unreachable(output),
Expand All @@ -197,11 +198,12 @@ impl Handler {
self.endpoint.is_listener(),
"A connection dialer never initiates a connection upgrade."
);
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundConnectNegotiated {
remote_addrs: obs_addrs,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundConnectNegotiated {
remote_addrs: obs_addrs,
},
));
}

fn on_listen_upgrade_error(
Expand All @@ -228,21 +230,23 @@ impl Handler {

match error {
StreamUpgradeError::Timeout => {
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::Timeout,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::Timeout,
},
));
}
StreamUpgradeError::NegotiationFailed => {
// The remote merely doesn't support the DCUtR protocol.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::NegotiationFailed,
},
));
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundNegotiationFailed {
error: StreamUpgradeError::NegotiationFailed,
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
Expand Down Expand Up @@ -342,7 +346,7 @@ impl ConnectionHandler for Handler {
self.inbound_connect = None;
match result {
Ok(addresses) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::InboundConnectNegotiated(addresses),
));
}
Expand Down
14 changes: 7 additions & 7 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ impl EnabledHandler {
if !self.peer_kind_sent {
if let Some(peer_kind) = self.peer_kind.as_ref() {
self.peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
peer_kind.clone(),
)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(peer_kind.clone()),
));
}
}

Expand All @@ -261,7 +261,7 @@ impl EnabledHandler {
self.last_io_activity = Instant::now();
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ConnectionHandlerEvent::Custom(message));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
}
Poll::Ready(Some(Err(error))) => {
log::debug!("Failed to read from inbound stream: {error}");
Expand Down Expand Up @@ -466,9 +466,9 @@ impl ConnectionHandler for Handler {
Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
if !*peer_kind_sent {
*peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
PeerKind::NotSupported,
)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
}

Poll::Pending
Expand Down
21 changes: 11 additions & 10 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ impl Handler {
future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info);
self.events
.push(ConnectionHandlerEvent::Custom(Event::Identified(
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
}
future::Either::Right(()) => self
.events
.push(ConnectionHandlerEvent::Custom(Event::IdentificationPushed)),
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
)),
}
}

Expand All @@ -192,10 +192,9 @@ impl Handler {
>,
) {
let err = err.map_upgrade_err(|e| e.into_inner());
self.events
.push(ConnectionHandlerEvent::Custom(Event::IdentificationError(
err,
)));
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(err),
));
self.trigger_next_identify.reset(self.interval);
}

Expand Down Expand Up @@ -309,7 +308,9 @@ impl ConnectionHandler for Handler {

if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identified(info)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
}
}

Expand All @@ -319,7 +320,7 @@ impl ConnectionHandler for Handler {
.map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));

return Poll::Ready(ConnectionHandlerEvent::Custom(event));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
}

Poll::Pending
Expand Down
48 changes: 29 additions & 19 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ where
> {
if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ConnectionHandlerEvent::Custom(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone(),
},
Expand Down Expand Up @@ -826,7 +826,7 @@ where
Err(error) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
Expand All @@ -844,10 +844,12 @@ where
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
},
)
});

return Poll::Ready(event);
Expand All @@ -870,10 +872,12 @@ where
Poll::Ready(Err(error)) => {
*this = OutboundSubstreamState::Done;
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
},
)
});

return Poll::Ready(event);
Expand All @@ -886,7 +890,9 @@ where
*this = OutboundSubstreamState::Closing(substream);
let event = process_kad_response(msg, user_data);

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
Poll::Pending => {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
Expand All @@ -899,7 +905,9 @@ where
user_data,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
Poll::Ready(None) => {
*this = OutboundSubstreamState::Done;
Expand All @@ -910,15 +918,17 @@ where
user_data,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
event,
)));
}
}
}
OutboundSubstreamState::ReportError(error, user_data) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::QueryError { error, user_data };

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event)));
}
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
Expand Down Expand Up @@ -971,7 +981,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::FindNodeReq {
key,
request_id: KademliaRequestId {
Expand All @@ -983,7 +993,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetProvidersReq {
key,
request_id: KademliaRequestId {
Expand All @@ -998,14 +1008,14 @@ where
connection_id,
substream,
};
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::AddProvider { key, provider },
)));
}
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::GetRecord {
key,
request_id: KademliaRequestId {
Expand All @@ -1017,7 +1027,7 @@ where
Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => {
*this =
InboundSubstreamState::WaitingBehaviour(connection_id, substream, None);
return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
KademliaHandlerEvent::PutRecord {
record,
request_id: KademliaRequestId {
Expand Down
4 changes: 2 additions & 2 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ConnectionHandler for Handler {
.pop_front()
.expect("requested stream without pending command");
self.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event {
.push_back(ConnectionHandlerEvent::NotifyBehaviour(Event {
id,
result: Err(error),
}));
Expand Down Expand Up @@ -179,7 +179,7 @@ impl ConnectionHandler for Handler {

while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) {
match result {
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)),
Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)),
Err(e) => {
panic!("{e:?}")
}
Expand Down
4 changes: 3 additions & 1 deletion protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ impl ConnectionHandler for Handler {
> {
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })),
Ok(stats) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
}
Err(e) => {
error!("{e:?}")
}
Expand Down
14 changes: 8 additions & 6 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ impl ConnectionHandler for Handler {
}
State::Inactive { reported: false } => {
self.state = State::Inactive { reported: true };
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(Failure::Unsupported)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
Failure::Unsupported,
)));
}
State::Active => {}
}
Expand All @@ -274,7 +276,7 @@ impl ConnectionHandler for Handler {
Poll::Ready(Ok(stream)) => {
// A ping from a remote peer has been answered, wait for the next.
self.inbound = Some(protocol::recv_ping(stream).boxed());
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Pong)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(Success::Pong)));
}
}
}
Expand All @@ -299,7 +301,7 @@ impl ConnectionHandler for Handler {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}

return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
}
}

Expand All @@ -318,9 +320,9 @@ impl ConnectionHandler for Handler {
self.failures = 0;
self.timer.reset(self.config.interval);
self.outbound = Some(OutboundState::Idle(stream));
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Ping {
rtt,
})));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(
Success::Ping { rtt },
)));
}
Poll::Ready(Err(e)) => {
self.pending_errors
Expand Down
Loading

0 comments on commit 9f3c851

Please sign in to comment.