Skip to content

Commit

Permalink
transport_service: Improve connection stability by downgrading connec…
Browse files Browse the repository at this point in the history
…tions on substream inactivity (#260)

This PR advances the keep-alive timeout of the transport service.

Previously, the keep-alive timeout was triggered 5 seconds after the
connection was reported to the transport service regardless of substream
activity.

- (0secs) T0: connection established; keep-alive timeout set to 5seconds
in the future
- (4secs) T1: substream A, B, C opened
- (5secs) T2: keep-alive timeout triggered and the connection is
downgraded. T1 was not taken into account, otherwise, the keep-alive
timeout should be triggered at second 9 (T1 at 4 seconds + keepalive 5
seconds)
- (6secs) T3: substreams A, B, C closed -> connection closes
- (7secs) T4: cannot open new substreams even if we expected the
connection to be kept alive for longer

In this PR:
- `KeepAliveTracker` structure to forward the keep-alive timeout of
connections.
- Connection ID is forwarded to `SubstreamOpened` events to identify
properly substream Ids. This is needed because the `ConnectionContext`
contains up to two connections (primary and secondary)

### Testing Done
- test to ensure keepalive downgrades the connection after 5 seconds
- test to ensure keepalive is forwarded on substream activity
- test to ensure a downgraded connection with dropped substreams is
closed


Closes #253.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
lexnv and dmitry-markin authored Oct 31, 2024
1 parent c0fef8d commit 314a2e9
Show file tree
Hide file tree
Showing 9 changed files with 790 additions and 38 deletions.
17 changes: 16 additions & 1 deletion src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl ConnectionHandle {
}
}

/// Try to upgrade the connection to active state.
pub fn try_upgrade(&mut self) {
if let ConnectionType::Inactive(inactive) = &self.connection {
if let Some(active) = inactive.upgrade() {
self.connection = ConnectionType::Active(active);
}
}
}

/// Attempt to acquire permit which will keep the connection open for indefinite time.
pub fn try_get_permit(&self) -> Option<Permit> {
match &self.connection {
Expand All @@ -120,6 +129,7 @@ impl ConnectionHandle {
protocol: protocol.clone(),
fallback_names,
substream_id,
connection_id: self.connection_id.clone(),
permit,
})
.map_err(|error| match error {
Expand All @@ -141,10 +151,15 @@ impl ConnectionHandle {
TrySendError::Closed(_) => Error::ConnectionClosed,
})
}

/// Check if the connection is active.
pub fn is_active(&self) -> bool {
matches!(self.connection, ConnectionType::Active(_))
}
}

/// Type which allows the connection to be kept open.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Permit {
/// Active connection.
_connection: Sender<ProtocolCommand>,
Expand Down
2 changes: 2 additions & 0 deletions src/protocol/notification/tests/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ async fn remote_opens_multiple_inbound_substreams() {
SubstreamId::from(0usize),
Box::new(DummySubstream::new()),
),
connection_id: ConnectionId::from(0usize),
})
.await
.unwrap();
Expand Down Expand Up @@ -511,6 +512,7 @@ async fn remote_opens_multiple_inbound_substreams() {
SubstreamId::from(0usize),
Box::new(substream),
),
connection_id: ConnectionId::from(0usize),
})
.await
.unwrap();
Expand Down
10 changes: 9 additions & 1 deletion src/protocol/protocol_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub enum InnerTransportEvent {
/// distinguish between different outbound substreams.
direction: Direction,

/// Connection ID.
connection_id: ConnectionId,

/// Substream.
substream: Substream,
},
Expand Down Expand Up @@ -149,6 +152,7 @@ impl From<InnerTransportEvent> for TransportEvent {
fallback,
direction,
substream,
..
} => TransportEvent::SubstreamOpened {
peer,
protocol,
Expand All @@ -164,7 +168,7 @@ impl From<InnerTransportEvent> for TransportEvent {
}

/// Events emitted by the installed protocols to transport.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ProtocolCommand {
/// Open substream.
OpenSubstream {
Expand Down Expand Up @@ -192,6 +196,9 @@ pub enum ProtocolCommand {
/// and associate incoming substreams with whatever logic it has.
substream_id: SubstreamId,

/// Connection ID.
connection_id: ConnectionId,

/// Connection permit.
///
/// `Permit` allows the connection to be kept open while the permit is held and it is given
Expand Down Expand Up @@ -300,6 +307,7 @@ impl ProtocolSet {
fallback,
direction,
substream,
connection_id: self.connection.connection_id().clone(),
};

protocol_context
Expand Down
Loading

0 comments on commit 314a2e9

Please sign in to comment.