Skip to content

Commit

Permalink
feat: integrate multiplexing (#5559)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Dec 12, 2023
1 parent 701e378 commit 5062b7e
Show file tree
Hide file tree
Showing 16 changed files with 1,176 additions and 134 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/net/eth-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov
arbitrary = { workspace = true, features = ["derive"] }
proptest.workspace = true
proptest-derive.workspace = true
async-stream = "0.3"

[features]
default = ["serde"]
Expand Down
24 changes: 22 additions & 2 deletions crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ impl SharedCapability {
}
}

/// Returns the eth version if it's the `eth` capability.
pub fn eth_version(&self) -> Option<EthVersion> {
match self {
SharedCapability::Eth { version, .. } => Some(*version),
_ => None,
}
}

/// Returns the message ID offset of the current capability.
///
/// This represents the message ID offset for the first message of the eth capability in the
Expand Down Expand Up @@ -375,8 +383,8 @@ impl SharedCapabilities {

/// Returns the negotiated eth version if it is shared.
#[inline]
pub fn eth_version(&self) -> Result<u8, P2PStreamError> {
self.eth().map(|cap| cap.version())
pub fn eth_version(&self) -> Result<EthVersion, P2PStreamError> {
self.eth().map(|cap| cap.eth_version().expect("is eth; qed"))
}

/// Returns true if the shared capabilities contain the given capability.
Expand Down Expand Up @@ -438,6 +446,18 @@ impl SharedCapabilities {
) -> Result<&SharedCapability, UnsupportedCapabilityError> {
self.find(cap).ok_or_else(|| UnsupportedCapabilityError { capability: cap.clone() })
}

/// Returns the number of shared capabilities.
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}

/// Returns true if there are no shared capabilities.
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

/// Determines the offsets for each shared capability between the input list of peer
Expand Down
6 changes: 6 additions & 0 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where
#[pin_project]
#[derive(Debug)]
pub struct EthStream<S> {
/// Negotiated eth version.
version: EthVersion,
#[pin]
inner: S,
Expand All @@ -174,26 +175,31 @@ pub struct EthStream<S> {
impl<S> EthStream<S> {
/// Creates a new unauthed [`EthStream`] from a provided stream. You will need
/// to manually handshake a peer.
#[inline]
pub fn new(version: EthVersion, inner: S) -> Self {
Self { version, inner }
}

/// Returns the eth version.
#[inline]
pub fn version(&self) -> EthVersion {
self.version
}

/// Returns the underlying stream.
#[inline]
pub fn inner(&self) -> &S {
&self.inner
}

/// Returns mutable access to the underlying stream.
#[inline]
pub fn inner_mut(&mut self) -> &mut S {
&mut self.inner
}

/// Consumes this type and returns the wrapped stream.
#[inline]
pub fn into_inner(self) -> S {
self.inner
}
Expand Down
20 changes: 20 additions & 0 deletions crates/net/eth-wire/src/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl HelloMessageWithProtocols {
}

/// Returns the raw [HelloMessage] without the additional protocol information.
#[inline]
pub fn message(&self) -> HelloMessage {
HelloMessage {
protocol_version: self.protocol_version,
Expand All @@ -69,6 +70,25 @@ impl HelloMessageWithProtocols {
id: self.id,
}
}

/// Returns true if the set of protocols contains the given protocol.
#[inline]
pub fn contains_protocol(&self, protocol: &Protocol) -> bool {
self.protocols.iter().any(|p| p.cap == protocol.cap)
}

/// Adds a new protocol to the set.
///
/// Returns an error if the protocol already exists.
#[inline]
pub fn try_add_protocol(&mut self, protocol: Protocol) -> Result<(), Protocol> {
if self.contains_protocol(&protocol) {
Err(protocol)
} else {
self.protocols.push(protocol);
Ok(())
}
}
}

// TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests
Expand Down
Loading

0 comments on commit 5062b7e

Please sign in to comment.