Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport: Poll Transport directly, remove ListenersStream #2652

Merged
merged 56 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
f134c8b
core/transport: remove Transport::Listener
elenaf9 May 15, 2022
aac9e1c
swarm: remove ListenerStream, poll Transport
elenaf9 May 15, 2022
68663fd
transports/tcp: handle transport changes
elenaf9 May 15, 2022
72c76f0
*: adapt majority of other transports
elenaf9 May 16, 2022
945c4a0
transports/tcp: rename *TcpConfig to *TcpTransport
elenaf9 May 21, 2022
d6f0e75
core/transport: adapt remaining transports
elenaf9 May 22, 2022
858590f
core/transports: unify imports, clean code
elenaf9 May 22, 2022
45f9c96
core/transport: split TransportEvent generics
elenaf9 May 22, 2022
bfd5fb0
core/transport: impl Stream for transport::Boxed
elenaf9 May 22, 2022
90721b9
transports/tcp: impl Stream for GenTcpTransport
elenaf9 May 22, 2022
80c3da1
*: format
elenaf9 May 26, 2022
bcb71a1
transports/dns: adapt dns transport
elenaf9 May 26, 2022
38a2b78
transports/dns: adapt websocket transport
elenaf9 May 26, 2022
607412d
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
d7f5019
Remove various `Sync` bounds
thomaseizinger May 19, 2022
b4164e8
transports/tcp: revert Stream impl for GenTcpTransport
elenaf9 May 29, 2022
a7766cd
core/transport/memory: fix listener polling
elenaf9 May 29, 2022
9824acd
transports/uds: adapt uds transport
elenaf9 May 29, 2022
a2988b5
transports/tcp: impl Default for GenTcpTransport
elenaf9 May 29, 2022
2edb0cd
core/transport/memory: add MemoryTransport::new
elenaf9 May 29, 2022
907a5ab
protocols/*: adapt network behaviour protocols
elenaf9 May 29, 2022
1924c12
muxers/mplex: adapt tests and benches
elenaf9 May 29, 2022
8ba3c20
transports/*: adapt tests in upgrade transports
elenaf9 May 29, 2022
96ae97b
src/lib: adapt development transports
elenaf9 May 29, 2022
c686b5a
examples: adapt exmaples to tcp transport changes
elenaf9 May 29, 2022
354d2f0
*: add Transport::remove_listener
elenaf9 May 29, 2022
2226092
core/transport: create ListenerId within Transport
elenaf9 May 29, 2022
8f608cd
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
5f9ebb7
*: fix CI
elenaf9 May 29, 2022
199ce12
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 11, 2022
27ee9ca
transports/tcp: fix port-reuse tests
elenaf9 Jun 11, 2022
08f4f80
*: fix intra-doc links
elenaf9 Jun 11, 2022
f92c2a4
transports/tcp: rm unneeded trait-bounds in tests
elenaf9 Jun 11, 2022
a357d71
*: use random ListenerIds instead of namespaced
elenaf9 Jun 20, 2022
a4a745e
core/transport: remove (Partial)Ord for ListenerId
elenaf9 Jun 20, 2022
81c945c
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 20, 2022
1c2b9e5
transports/relay: adapt ClientTransport
elenaf9 Jun 26, 2022
b19e11a
*: apply comments from review
elenaf9 Jun 26, 2022
c95c97c
transport/wasm-ext: adapt wasm-ext transport
elenaf9 Jun 26, 2022
eb0397b
transports/wasm-ext: fix clippy
elenaf9 Jun 27, 2022
610f7ae
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 27, 2022
f28cdb1
*: use intra-doc links
elenaf9 Jun 27, 2022
4841d81
core/transport: remove unnecessary trait bounds
elenaf9 Jun 27, 2022
4e74407
*: rename TransportEvent::Error -> ::ListenerError
elenaf9 Jun 27, 2022
b410724
transport/upgrade: remove unecessary Option
elenaf9 Jun 27, 2022
71885ad
*: clean code, fix docs
elenaf9 Jun 27, 2022
2b3402c
transports/wasm-ext: rm leftover Self: Sized bound
elenaf9 Jun 27, 2022
469515e
*: fix missing renames
elenaf9 Jun 27, 2022
83a13dd
transports/tcp: rm unneeded dependencies, fix docs
elenaf9 Jun 28, 2022
3d4e0aa
transports/tcp: remove oudated comment
elenaf9 Jun 28, 2022
8600be7
*: add changelog entries
elenaf9 Jun 28, 2022
1a16312
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 28, 2022
d4b8ba5
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jul 1, 2022
03c5170
transports/tcp/CHANGELOG: remove wrong PR ref
elenaf9 Jul 1, 2022
5f5eb08
Merge branch 'master' into refactor-transport-trait
elenaf9 Jul 1, 2022
c2bc9e8
Merge branch 'master' into refactor-transport-trait
mxinden Jul 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
transport/wasm-ext: adapt wasm-ext transport
  • Loading branch information
elenaf9 committed Jun 26, 2022
commit c95c97cd27d9be29df4a75852007746d5b84b7fe
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ default = [
"secp256k1",
"tcp-async-io",
"uds",
# "wasm-ext",
"wasm-ext",
"websocket",
"yamux",
]
Expand Down Expand Up @@ -59,8 +59,8 @@ tcp-async-io = ["dep:libp2p-tcp", "libp2p-tcp?/async-io"]
tcp-tokio = ["dep:libp2p-tcp", "libp2p-tcp?/tokio"]
uds = ["dep:libp2p-uds"]
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "rand/wasm-bindgen"]
# wasm-ext = ["dep:libp2p-wasm-ext"]
# wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"]
wasm-ext = ["dep:libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"]
websocket = ["dep:libp2p-websocket"]
yamux = ["dep:libp2p-yamux"]
secp256k1 = ["libp2p-core/secp256k1"]
Expand Down Expand Up @@ -95,7 +95,7 @@ libp2p-request-response = { version = "0.18.0", path = "protocols/request-respon
libp2p-swarm = { version = "0.36.1", path = "swarm" }
libp2p-swarm-derive = { version = "0.27.0", path = "swarm-derive" }
libp2p-uds = { version = "0.32.0", path = "transports/uds", optional = true }
# libp2p-wasm-ext = { version = "0.33.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-wasm-ext = { version = "0.33.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.37.0", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.14.0" }
parking_lot = "0.12.0"
Expand Down Expand Up @@ -151,7 +151,7 @@ members = [
"transports/tcp",
"transports/uds",
"transports/websocket",
# "transports/wasm-ext"
"transports/wasm-ext"
]

[[example]]
Expand Down
134 changes: 100 additions & 34 deletions transports/wasm-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
//! module.
//!

use futures::{future::Ready, prelude::*};
use futures::{future::Ready, prelude::*, ready, stream::SelectAll};
use libp2p_core::{
connection::Endpoint,
transport::{ListenerEvent, TransportError},
transport::{ListenerId, TransportError, TransportEvent},
Multiaddr, Transport,
};
use parity_send_wrapper::SendWrapper;
Expand Down Expand Up @@ -147,15 +147,18 @@ pub mod ffi {
/// Implementation of `Transport` whose implementation is handled by some FFI.
pub struct ExtTransport {
inner: SendWrapper<ffi::Transport>,
listeners: SelectAll<Listen>,
}

impl ExtTransport {
/// Creates a new `ExtTransport` that uses the given external `Transport`.
pub fn new(transport: ffi::Transport) -> Self {
ExtTransport {
inner: SendWrapper::new(transport),
listeners: SelectAll::new(),
}
}

fn do_dial(
&mut self,
addr: Multiaddr,
Expand Down Expand Up @@ -187,38 +190,40 @@ impl fmt::Debug for ExtTransport {
}
}

impl Clone for ExtTransport {
fn clone(&self) -> Self {
ExtTransport {
inner: SendWrapper::new(self.inner.clone().into()),
}
}
}

impl Transport for ExtTransport {
type Output = Connection;
type Error = JsErr;
type Listener = Listen;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = Dial;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let iter = self.inner.listen_on(&addr.to_string()).map_err(|err| {
if is_not_supported_error(&err) {
TransportError::MultiaddrNotSupported(addr)
} else {
TransportError::Other(JsErr::from(err))
}
})?;

Ok(Listen {
let listener_id = ListenerId::new();
let listen = Listen {
listener_id,
iterator: SendWrapper::new(iter),
next_event: None,
pending_events: VecDeque::new(),
})
is_closed: false,
};
self.listeners.push(listen);
Ok(listener_id)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self.listeners.iter_mut().find(|l| l.listener_id == id) {
Some(listener) => {
listener.close(Ok(()));
true
}
None => false,
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
Expand All @@ -241,6 +246,16 @@ impl Transport for ExtTransport {
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match ready!(self.listeners.poll_next_unpin(cx)) {
Some(event) => return Poll::Ready(event),
None => Poll::Pending,
}
}
}

/// Future that dial a remote through an external transport.
Expand Down Expand Up @@ -271,27 +286,47 @@ impl Future for Dial {
/// Stream that listens for incoming connections through an external transport.
#[must_use = "futures do nothing unless polled"]
pub struct Listen {
listener_id: ListenerId,
/// Iterator of `ListenEvent`s.
iterator: SendWrapper<js_sys::Iterator>,
/// Promise that will yield the next `ListenEvent`.
next_event: Option<SendWrapper<JsFuture>>,
/// List of events that we are waiting to propagate.
pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>>,
pending_events: VecDeque<<Self as Stream>::Item>,
/// If the iterator is done close the listener.
is_closed: bool,
}

impl Listen {
/// Report the listener as closed as terminate its stream.
fn close(&mut self, reason: Result<(), JsErr>) {
self.pending_events
.push_back(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
});
self.is_closed = true;
}
}

impl fmt::Debug for Listen {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Listen").finish()
f.debug_tuple("Listen").field(&self.listener_id).finish()
}
}

impl Stream for Listen {
type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>, JsErr>;
type Item = TransportEvent<<ExtTransport as Transport>::ListenerUpgrade, JsErr>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(Some(Ok(ev)));
return Poll::Ready(Some(ev));
}

if self.is_closed {
// Terminate the stream if the listener closed and all remaining events have been reported.
return Poll::Ready(None);
}

// Try to fill `self.next_event` if necessary and possible. If we fail, then
Expand All @@ -309,30 +344,55 @@ impl Stream for Listen {
let e = match Future::poll(Pin::new(&mut **next_event), cx) {
Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
Poll::Ready(Err(err)) => {
self.close(Err(err.into()));
continue;
}
};
self.next_event = None;
e
} else {
return Poll::Ready(None);
self.close(Ok(()));
continue;
};

let listener_id = self.listener_id;

if let Some(addrs) = event.new_addrs() {
for addr in addrs.iter() {
let addr = js_value_to_addr(addr)?;
self.pending_events
.push_back(ListenerEvent::NewAddress(addr));
match js_value_to_addr(addr) {
Ok(addr) => self.pending_events.push_back(TransportEvent::NewAddress {
listener_id,
listen_addr: addr,
}),
Err(err) => self.pending_events.push_back(TransportEvent::Error {
listener_id,
error: err,
}),
};
}
}

if let Some(upgrades) = event.new_connections() {
for upgrade in upgrades.iter().cloned() {
let upgrade: ffi::ConnectionEvent = upgrade.into();
self.pending_events.push_back(ListenerEvent::Upgrade {
local_addr: upgrade.local_addr().parse()?,
remote_addr: upgrade.observed_addr().parse()?,
upgrade: futures::future::ok(Connection::new(upgrade.connection())),
});
match upgrade.local_addr().parse().and_then(|local| {
let observed = upgrade.observed_addr().parse()?;
Ok((local, observed))
}) {
Ok((local_addr, send_back_addr)) => {
self.pending_events.push_back(TransportEvent::Incoming {
listener_id,
local_addr,
send_back_addr,
upgrade: futures::future::ok(Connection::new(upgrade.connection())),
})
}
Err(err) => self.pending_events.push_back(TransportEvent::Error {
listener_id,
error: err.into(),
}),
}
}
}

Expand All @@ -341,8 +401,14 @@ impl Stream for Listen {
match js_value_to_addr(addr) {
Ok(addr) => self
.pending_events
.push_back(ListenerEvent::NewAddress(addr)),
Err(err) => self.pending_events.push_back(ListenerEvent::Error(err)),
.push_back(TransportEvent::AddressExpired {
listener_id,
listen_addr: addr,
}),
Err(err) => self.pending_events.push_back(TransportEvent::Error {
listener_id,
error: err,
}),
}
}
}
Expand Down