Skip to content

Commit

Permalink
Merge 4f172ab into 21b0360
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 authored Jan 29, 2021
2 parents 21b0360 + 4f172ab commit 1edb379
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 90 deletions.
1 change: 1 addition & 0 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ where

if self.svc.ready_and().await.is_err() {
// Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
self.fail_with(PeerError::Overloaded);
return;
}
Expand Down
253 changes: 163 additions & 90 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::{

use futures::{
future::{FutureExt, TryFutureExt},
stream::{Stream, TryStreamExt},
stream::Stream,
};
use oneshot::error::TryRecvError;
use tokio::sync::oneshot;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};

Expand All @@ -28,8 +29,61 @@ use downloads::Downloads;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Verifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type InboundDownloads = Downloads<Timeout<Outbound>, Timeout<Verifier>, State>;

pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
pub type NetworkSetupData = (Outbound, Arc<Mutex<AddressBook>>);

/// Tracks the internal state of the [`Inbound`] service during network setup.
pub enum Setup {
/// Waiting for network setup to complete.
///
/// Requests that depend on Zebra's internal network setup are ignored.
/// Other requests are answered.
AwaitingNetwork {
/// A oneshot channel used to receive the address_book and outbound services
/// after the network is set up.
network_setup: oneshot::Receiver<NetworkSetupData>,

/// A service that verifies downloaded blocks. Given to `downloads`
/// after the network is set up.
verifier: Verifier,
},

/// Network setup is complete.
///
/// All requests are answered.
Initialized {
/// A shared list of peer addresses.
address_book: Arc<Mutex<zn::AddressBook>>,

/// A `futures::Stream` that downloads and verifies gossipped blocks.
downloads: Pin<Box<InboundDownloads>>,
},

/// Temporary state used in the service's internal network initialization
/// code.
///
/// If this state occurs outside the service initialization code, the
/// service panics.
FailedInit,

/// Network setup failed, because the setup channel permanently failed.
/// The service keeps returning readiness errors for every request.
///
/// We keep hold of the closed oneshot, so we can use it to create a
/// new error for each `poll_ready` call.
FailedRecv { error: SharedRecvError },
}

#[derive(thiserror::Error, Debug, Clone)]
#[error(transparent)]
pub struct SharedRecvError(Arc<TryRecvError>);

impl From<TryRecvError> for SharedRecvError {
fn from(source: TryRecvError) -> Self {
Self(Arc::new(source))
}
}

/// Uses the node state to respond to inbound peer requests.
///
Expand All @@ -53,54 +107,35 @@ pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
/// responding to block gossip by attempting to download and validate advertised
/// blocks.
pub struct Inbound {
// invariant: address_book, outbound, downloads are Some if network_setup is None
//
// why not use an enum for the inbound state? because it would mean
// match-wrapping the body of Service::call rather than just expect()ing
// some Options.

// Setup
/// A oneshot channel used to receive the address_book and outbound services
/// after the network is set up.
network_setup: Option<oneshot::Receiver<SetupData>>,

// Services
/// A list of peer addresses.
address_book: Option<Arc<Mutex<zn::AddressBook>>>,

/// A service that downloads and verifies gossipped blocks.
downloads: Option<Pin<Box<Downloads<Timeout<Outbound>, Timeout<Verifier>, State>>>>,

/// A service that forwards requests to connected peers, and returns their
/// responses.
/// Provides network-dependent services, if they are available.
///
/// Only used for readiness checks, and via `downloads`.
outbound: Option<Outbound>,
/// Some services are unavailable until Zebra has completed network setup.
network_setup: Setup,

/// A service that manages cached blockchain state.
state: State,

/// A service that verifies downloaded blocks.
///
/// Only used for readiness checks, and via `downloads`.
verifier: Verifier,
}

impl Inbound {
pub fn new(
network_setup: oneshot::Receiver<SetupData>,
network_setup: oneshot::Receiver<NetworkSetupData>,
state: State,
verifier: Verifier,
) -> Self {
Self {
network_setup: Some(network_setup),
address_book: None,
downloads: None,
outbound: None,
network_setup: Setup::AwaitingNetwork {
network_setup,
verifier,
},
state,
verifier,
}
}

fn take_setup(&mut self) -> Setup {
let mut network_setup = Setup::FailedInit;
std::mem::swap(&mut self.network_setup, &mut network_setup);
network_setup
}
}

impl Service<zn::Request> for Inbound {
Expand All @@ -115,85 +150,118 @@ impl Service<zn::Request> for Inbound {
// and reporting unreadiness might cause unwanted load-shedding, since
// the load-shed middleware is unable to distinguish being unready due
// to load from being unready while waiting on setup.
if let Some(mut rx) = self.network_setup.take() {
use oneshot::error::TryRecvError;
match rx.try_recv() {

// Every network_setup state handler must provide a result
let result;

self.network_setup = match self.take_setup() {
Setup::AwaitingNetwork {
mut network_setup,
verifier,
} => match network_setup.try_recv() {
Ok((outbound, address_book)) => {
self.outbound = Some(outbound.clone());
self.address_book = Some(address_book);
self.network_setup = None;
self.downloads = Some(Box::pin(Downloads::new(
let downloads = Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(self.verifier.clone(), BLOCK_VERIFY_TIMEOUT),
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
self.state.clone(),
)));
));
result = Ok(());
Setup::Initialized {
address_book,
downloads,
}
}
Err(TryRecvError::Empty) => {
self.network_setup = Some(rx);
// There's no setup data yet, so keep waiting for it
result = Ok(());
Setup::AwaitingNetwork {
network_setup,
verifier,
}
}
Err(e @ TryRecvError::Closed) => {
// In this case, report that the service failed, and put the
// failed oneshot back so we'll fail again in case
// poll_ready is called after failure.
self.network_setup = Some(rx);
return Poll::Ready(Err(e.into()));
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because network setup failed
error!(?error, "inbound network setup failed");
let error: SharedRecvError = error.into();
result = Err(error.clone().into());
Setup::FailedRecv { error }
}
};
}
},
// Make sure previous network setups were left in a valid state
Setup::FailedInit => unreachable!("incomplete previous Inbound initialization"),
// If network setup failed, report service failure
Setup::FailedRecv { error } => {
result = Err(error.clone().into());
Setup::FailedRecv { error }
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
mut downloads,
} => {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}

// Clean up completed download tasks
if let Some(downloads) = self.downloads.as_mut() {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
}
result = Ok(());
Setup::Initialized {
address_book,
downloads,
}
}
};

// Now report readiness based on readiness of the inner services, if they're available.
// XXX do we want to propagate backpressure from the network here?
match (
self.state.poll_ready(cx),
self.outbound
.as_mut()
.map(|svc| svc.poll_ready(cx))
.unwrap_or(Poll::Ready(Ok(()))),
) {
(Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
// Make sure we're leaving the network setup in a valid state
if matches!(self.network_setup, Setup::FailedInit) {
unreachable!("incomplete Inbound initialization after poll_ready state handling");
}

// TODO:
// * do we want to propagate backpressure from the download queue or its outbound network?
// currently, the download queue waits for the outbound network in the download future,
// and drops new requests after it reaches a hard-coded limit. This is the
// "load shed directly" pattern from #1618.
// * currently, the state service is always ready, unless its buffer is full.
// So we might also want to propagate backpressure from its buffer.
// * if we want to propagate backpressure, add a ReadyCache for each service, to ensure
// that each poll_ready has a matching call. See #1593 for details.
Poll::Ready(result)
}

#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
match req {
zn::Request::Peers => match self.address_book.as_ref() {
Some(addrs) => {
zn::Request::Peers => {
if let Setup::Initialized { address_book, .. } = &self.network_setup {
// We could truncate the list to try to not reveal our entire
// peer set. But because we don't monitor repeated requests,
// this wouldn't actually achieve anything, because a crawler
// could just repeatedly query it.
let mut peers = addrs.lock().unwrap().sanitized();
let mut peers = address_book.lock().unwrap().sanitized();
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
peers.truncate(MAX_ADDR);
async { Ok(zn::Response::Peers(peers)) }.boxed()
} else {
info!("ignoring `Peers` request from remote peer during network setup");
async { Ok(zn::Response::Nil) }.boxed()
}
None => async { Err("not ready to serve addresses".into()) }.boxed(),
},
}
zn::Request::BlocksByHash(hashes) => {
let state = self.state.clone();
let requests = futures::stream::iter(
hashes
.into_iter()
.map(|hash| zs::Request::Block(hash.into())),
);

state
.call_all(requests)
.try_filter_map(|rsp| {
futures::future::ready(match rsp {
zs::Response::Block(Some(block)) => Ok(Some(block)),
// Correctness:
//
// We can't use `call_all` here, because it leaks buffer slots:
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
use futures::stream::TryStreamExt;
hashes
.into_iter()
.map(|hash| zs::Request::Block(hash.into()))
.map(|request| self.state.clone().oneshot(request))
.collect::<futures::stream::FuturesOrdered<_>>()
.try_filter_map(|response| async move {
Ok(match response {
zs::Response::Block(Some(block)) => Some(block),
// `zcashd` ignores missing blocks in GetData responses,
// rather than including them in a trailing `NotFound`
// message
zs::Response::Block(None) => Ok(None),
zs::Response::Block(None) => None,
_ => unreachable!("wrong response from state"),
})
})
Expand All @@ -214,7 +282,7 @@ impl Service<zn::Request> for Inbound {
}
zn::Request::FindBlocks { known_blocks, stop } => {
let request = zs::Request::FindBlockHashes { known_blocks, stop };
self.state.call(request).map_ok(|resp| match resp {
self.state.clone().oneshot(request).map_ok(|resp| match resp {
zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
_ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
Expand All @@ -223,7 +291,7 @@ impl Service<zn::Request> for Inbound {
}
zn::Request::FindHeaders { known_blocks, stop } => {
let request = zs::Request::FindBlockHeaders { known_blocks, stop };
self.state.call(request).map_ok(|resp| match resp {
self.state.clone().oneshot(request).map_ok(|resp| match resp {
zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
_ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
Expand All @@ -239,8 +307,13 @@ impl Service<zn::Request> for Inbound {
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseBlock(hash) => {
if let Some(downloads) = self.downloads.as_mut() {
if let Setup::Initialized { downloads, .. } = &mut self.network_setup {
downloads.download_and_verify(hash);
} else {
info!(
?hash,
"ignoring `AdvertiseBlock` request from remote peer during network setup"
);
}
async { Ok(zn::Response::Nil) }.boxed()
}
Expand Down

0 comments on commit 1edb379

Please sign in to comment.