Skip to content

Commit

Permalink
Merge pull request #5 from cyclefortytwo/stream-trait
Browse files Browse the repository at this point in the history
Represent Stream as a trait
  • Loading branch information
hashmap authored Jul 12, 2019
2 parents 98d5ffb + b053e43 commit f627e8e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 88 deletions.
10 changes: 6 additions & 4 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,14 @@ impl Tracker {
/// Start listening on the provided connection and wraps it. Does not hang
/// the current thread, instead just returns a future and the Connection
/// itself.
pub fn listen<H>(
stream: Stream,
pub fn listen<S, H>(
stream: S,
version: ProtocolVersion,
tracker: Arc<Tracker>,
handler: H,
) -> io::Result<(ConnHandle, StopHandle)>
where
S: Stream + 'static,
H: MessageHandler,
{
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
Expand All @@ -289,15 +290,16 @@ where
))
}

fn poll<H>(
conn: Stream,
fn poll<S, H>(
conn: S,
version: ProtocolVersion,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
close_rx: mpsc::Receiver<()>,
tracker: Arc<Tracker>,
) -> io::Result<JoinHandle<()>>
where
S: Stream + 'static,
H: MessageHandler,
{
// Split out tcp stream out into separate reader/writer halves.
Expand Down
16 changes: 8 additions & 8 deletions p2p/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ impl Handshake {
}
}

pub fn initiate(
pub fn initiate<S: Stream>(
&self,
capabilities: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
conn: &mut Stream,
conn: &mut S,
) -> Result<PeerInfo, Error> {
// prepare the first part of the handshake
let nonce = self.next_nonce();
Expand Down Expand Up @@ -124,16 +124,16 @@ impl Handshake {
Ok(peer_info)
}

pub fn accept(
pub fn accept<S: Stream>(
&self,
capab: Capabilities,
total_difficulty: Difficulty,
conn: &mut Stream,
mut conn: S,
) -> Result<PeerInfo, Error> {
// Note: We read the Hand message *before* we know which protocol version
// is supported by our peer (it is in the Hand message).
let version = ProtocolVersion::default();
let hand: Hand = read_message(conn, version, Type::Hand)?;
let hand: Hand = read_message(&mut conn, version, Type::Hand)?;

// all the reasons we could refuse this connection for
if hand.genesis != self.genesis {
Expand All @@ -144,7 +144,7 @@ impl Handshake {
} else {
// check the nonce to see if we are trying to connect to ourselves
let nonces = self.nonces.read();
let addr = resolve_peer_addr(hand.sender_addr.clone(), &conn);
let addr = resolve_peer_addr(hand.sender_addr.clone(), &mut conn);
if nonces.contains(&hand.nonce) {
// save ip addresses of ourselves
let mut addrs = self.addrs.write();
Expand Down Expand Up @@ -183,7 +183,7 @@ impl Handshake {
user_agent: USER_AGENT.to_string(),
};

write_message(conn, shake, Type::Shake)?;
write_message(&mut conn, shake, Type::Shake)?;
trace!("Success handshake with {}.", peer_info.addr);

// when more than one protocol version is supported, choosing should go here
Expand All @@ -204,7 +204,7 @@ impl Handshake {
}

/// Resolve the correct peer_addr based on the connection and the advertised port.
fn resolve_peer_addr(advertised: PeerAddr, conn: &Stream) -> PeerAddr {
fn resolve_peer_addr<S: Stream>(advertised: PeerAddr, conn: &S) -> PeerAddr {
match advertised {
PeerAddr::Socket(addr) => {
let port = addr.port();
Expand Down
22 changes: 13 additions & 9 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ impl fmt::Debug for Peer {

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, conn: Stream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
fn new<S: Stream + 'static>(
info: PeerInfo,
conn: S,
adapter: Arc<dyn NetAdapter>,
) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
Expand All @@ -88,22 +92,22 @@ impl Peer {
})
}

pub fn accept(
mut conn: Stream,
pub fn accept<S: Stream + 'static>(
mut conn: S,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let addr = conn.peer_addr();
debug!("accept: handshaking from {:?}", addr);
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter)?),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
conn.peer_addr(),
e
addr, e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
Expand All @@ -113,8 +117,8 @@ impl Peer {
}
}

pub fn connect(
mut conn: Stream,
pub fn connect<S: Stream + 'static>(
mut conn: S,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
Expand All @@ -141,7 +145,7 @@ impl Peer {

/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: Stream) -> Result<(), Error> {
pub fn start<S: Stream + 'static>(&mut self, conn: S) -> Result<(), Error> {
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, self.info.clone());
let (sendh, stoph) = conn::listen(
Expand Down
26 changes: 14 additions & 12 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl Server {
match listener.accept() {
Ok((stream, peer_addr)) => {
let peer_addr = PeerAddr::Socket(peer_addr);
let stream = Stream::Tcp(stream);

if self.check_undesirable(&stream) {
continue;
Expand Down Expand Up @@ -134,7 +133,6 @@ impl Server {
match stream {
Ok(stream) => {
let peer_addr = PeerAddr::I2p(stream.local_addr()?);
let stream = Stream::I2p(stream);

if self.check_undesirable(&stream) {
continue;
Expand Down Expand Up @@ -205,23 +203,27 @@ impl Server {
self.config.port,
addr
);
let (stream, self_addr) = match addr {
match addr {
PeerAddr::Socket(addr) => {
let tcp_stream = TcpStream::connect_timeout(&addr, Duration::from_secs(10))
let stream = TcpStream::connect_timeout(&addr, Duration::from_secs(10))
.map_err(|e| Error::Connection(e))?;
let addr = PeerAddr::Socket(SocketAddr::new(self.config.host, self.config.port));
let stream = Stream::Tcp(tcp_stream);
(stream, addr)
self.new_connected_peer(stream, addr)
}
PeerAddr::I2p(i2p_addr) => {
let session = self.i2p_session.as_ref().ok_or(Error::Internal)?;
let stream =
Stream::I2p(I2pStream::connect_with_session(&session, i2p_addr.clone())?);
let stream = I2pStream::connect_with_session(&session, i2p_addr.clone())?;
let addr = stream.local_addr()?;
(stream, addr)
self.new_connected_peer(stream, PeerAddr::I2p(addr))
}
};
}
}

fn new_connected_peer<S: Stream + 'static>(
&self,
stream: S,
self_addr: PeerAddr,
) -> Result<Arc<Peer>, Error> {
let total_diff = self.peers.total_difficulty()?;
let peer = Peer::connect(
stream,
Expand All @@ -236,7 +238,7 @@ impl Server {
Ok(peer)
}

fn handle_new_peer(&self, stream: Stream) -> Result<(), Error> {
fn handle_new_peer<S: Stream + 'static>(&self, stream: S) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty()?;

// accept the peer and add it to the server map
Expand All @@ -260,7 +262,7 @@ impl Server {
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &Stream) -> bool {
fn check_undesirable<S: Stream>(&self, stream: &S) -> bool {
if let Ok(peer_addr) = stream.peer_addr() {
if self.peers.is_banned(&peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
Expand Down
115 changes: 62 additions & 53 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,75 +325,84 @@ impl PeerAddr {
}
}

/// Conveninence type that wraps I2pStream and TcpStream and allows us to
/// handle either without duplicating code just because their types differ.
/// Mostly a bunch of boilerplace directly delegating to the concrete stream
/// type.
pub enum Stream {
Tcp(TcpStream),
I2p(I2pStream),
/// Representation of network stream, currently implemented for I2pStream and TcpStream
pub trait Stream: Read + Write + Send + Sync {
/// Peer address this stream is connected to
fn peer_addr(&self) -> Result<PeerAddr, Error>;
/// Our network address
fn local_addr(&self) -> Result<PeerAddr, Error>;
/// Shutdown connection
fn shutdown(&self, how: Shutdown) -> Result<(), Error>;
/// Enable non-blocking IO
fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error>;
/// Try clone stream
fn try_clone(&self) -> Result<Box<Stream>, Error>;
}

impl Stream {
/// Peer address this stream is connected to
pub fn peer_addr(&self) -> Result<PeerAddr, Error> {
match self {
Stream::Tcp(s) => Ok(PeerAddr::Socket(s.peer_addr()?)),
Stream::I2p(s) => Ok(PeerAddr::I2p(s.peer_addr()?)),
}
impl<'a, S: Stream> Stream for &'a mut S {
fn peer_addr(&self) -> Result<PeerAddr, Error> {
S::peer_addr(self)
}

/// Our network address
pub fn local_addr(&self) -> Result<PeerAddr, Error> {
match self {
Stream::Tcp(s) => Ok(PeerAddr::Socket(s.local_addr()?)),
Stream::I2p(s) => Ok(PeerAddr::I2p(s.local_addr()?)),
}
fn local_addr(&self) -> Result<PeerAddr, Error> {
S::local_addr(self)
}

pub fn shutdown(&self, how: Shutdown) -> Result<(), Error> {
match self {
Stream::Tcp(s) => Ok(s.shutdown(how)?),
Stream::I2p(s) => Ok(s.shutdown(how)?),
}
fn shutdown(&self, how: Shutdown) -> Result<(), Error> {
S::shutdown(self, how)
}

pub fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error> {
match self {
Stream::Tcp(s) => Ok(s.set_nonblocking(nonblocking)?),
Stream::I2p(s) => Ok(s.set_nonblocking(nonblocking)?),
}
fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error> {
S::set_nonblocking(self, nonblocking)
}

pub fn try_clone(&self) -> Result<Stream, Error> {
match self {
Stream::Tcp(s) => Ok(Stream::Tcp(s.try_clone()?)),
Stream::I2p(s) => Ok(Stream::I2p(s.try_clone()?)),
}
fn try_clone(&self) -> Result<Box<Stream>, Error> {
S::try_clone(self)
}
}

impl Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Stream::Tcp(s) => s.read(buf),
Stream::I2p(s) => s.read(buf),
}
impl Stream for TcpStream {
fn peer_addr(&self) -> Result<PeerAddr, Error> {
Ok(PeerAddr::Socket(self.peer_addr()?))
}

fn local_addr(&self) -> Result<PeerAddr, Error> {
Ok(PeerAddr::Socket(self.local_addr()?))
}

fn shutdown(&self, how: Shutdown) -> Result<(), Error> {
self.shutdown(how).map_err(|e| e.into())
}

fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error> {
self.set_nonblocking(nonblocking).map_err(|e| e.into())
}

fn try_clone(&self) -> Result<Box<Stream>, Error> {
let s = self.try_clone()?;
Ok(Box::new(s))
}
}

impl Write for Stream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Stream::Tcp(s) => s.write(buf),
Stream::I2p(s) => s.write(buf),
}
impl Stream for I2pStream {
fn peer_addr(&self) -> Result<PeerAddr, Error> {
Ok(PeerAddr::I2p(self.peer_addr()?))
}
fn flush(&mut self) -> io::Result<()> {
match self {
Stream::Tcp(s) => s.flush(),
Stream::I2p(s) => s.flush(),
}

fn local_addr(&self) -> Result<PeerAddr, Error> {
Ok(PeerAddr::I2p(self.local_addr()?))
}

fn shutdown(&self, how: Shutdown) -> Result<(), Error> {
self.shutdown(how).map_err(|e| e.into())
}

fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error> {
self.set_nonblocking(nonblocking).map_err(|e| e.into())
}

fn try_clone(&self) -> Result<Box<Stream>, Error> {
let s = self.try_clone()?;
Ok(Box::new(s))
}
}

Expand Down
4 changes: 2 additions & 2 deletions p2p/tests/peer_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{thread, time};

use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty;
use crate::p2p::types::{PeerAddr, Stream};
use crate::p2p::types::PeerAddr;
use crate::p2p::Peer;

fn open_port() -> u16 {
Expand Down Expand Up @@ -68,7 +68,7 @@ fn peer_handshake() {
thread::sleep(time::Duration::from_secs(1));

let addr = SocketAddr::new(p2p_config.host, p2p_config.port);
let socket = Stream::Tcp(TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap());
let socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();

let my_addr = PeerAddr::from_socket_addr("127.0.0.1:5000".parse().unwrap());
let peer = Peer::connect(
Expand Down

0 comments on commit f627e8e

Please sign in to comment.