Skip to content

Commit

Permalink
Reject connections from outdated peers (#2519)
Browse files Browse the repository at this point in the history
* Simplify state service initialization in test

Use the test helper function to remove redundant code.

* Create `BestTipHeight` helper type

This type abstracts away the calculation of the best tip height based on
the finalized block height and the best non-finalized chain's tip.

* Add `best_tip_height` field to `StateService`

The receiver endpoint is currently ignored.

* Return receiver endpoint from service constructor

Make it available so that the best tip height can be watched.

* Update finalized height after finalizing blocks

After blocks from the queue are finalized and committed to disk, update
the finalized block height.

* Update best non-finalized height after validation

Update the value of the best non-finalized chain tip block height after
a new block is committed to the non-finalized state.

* Update finalized height after loading from disk

When `FinalizedState` is first created, it loads the state from
persistent storage, and the finalized tip height is updated. Therefore,
the `best_tip_height` must be notified of the initial value.

* Update the finalized height on checkpoint commit

When a checkpointed block is commited, it bypasses the non-finalized
state, so there's an extra place where the finalized height has to be
updated.

* Add `best_tip_height` to `Handshake` service

It can be configured using the `Builder::with_best_tip_height`. It's
currently not used, but it will be used to determine if a connection to
a remote peer should be rejected or not based on that peer's protocol
version.

* Require best tip height to init. `zebra_network`

Without it the handshake service can't properly enforce the minimum
network protocol version from peers. Zebrad obtains the best tip height
endpoint from `zebra_state`, and the test vectors simply use a dummy
endpoint that's fixed at the genesis height.

* Pass `best_tip_height` to proto. ver. negotiation

The protocol version negotiation code will reject connections to peers
if they are using an old protocol version. An old version is determined
based on the current known best chain tip height.

* Handle an optional height in `Version`

Fallback to the genesis height in `None` is specified.

* Reject connections to peers on old proto. versions

Avoid connecting to peers that are on protocol versions that don't
recognize a network update.

* Document why peers on old versions are rejected

Describe why it's a security issue above the check.

* Test if `BestTipHeight` starts with `None`

Check if initially there is no best tip height.

* Test if best tip height is max. of latest values

After applying a list of random updates where each one either sets the
finalized height or the non-finalized height, check that the best tip
height is the maximum of the most recently set finalized height and the
most recently set non-finalized height.

* Add `queue_and_commit_finalized` method

A small refactor to make testing easier. The handling of requests for
committing non-finalized and finalized blocks is now more consistent.

* Add `assert_block_can_be_validated` helper

Refactor to move into a separate method some assertions that are done
before a block is validated. This is to allow moving these assertions
more easily to simplify testing.

* Remove redundant PoW block assertion

It's also checked in
`zebra_state::service::check::block_is_contextually_valid`, and it was
getting in the way of tests that received a gossiped block before
finalizing enough blocks.

* Create a test strategy for test vector chain

Splits a chain loaded from the test vectors in two parts, containing the
blocks to finalize and the blocks to keep in the non-finalized state.

* Test committing blocks update best tip height

Create a mock blockchain state, with a chain of finalized blocks and a
chain of non-finalized blocks. Commit all the blocks appropriately, and
verify that the best tip height is updated.

Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
jvff and teor2345 authored Aug 8, 2021
1 parent 751185d commit 4c4dbfe
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 67 deletions.
43 changes: 31 additions & 12 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio::{
net::TcpStream,
sync::{broadcast, watch},
task::JoinError,
time::timeout,
};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand Down Expand Up @@ -53,6 +58,7 @@ pub struct Handshake<S> {
our_services: PeerServices,
relay: bool,
parent_span: Span,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
}

/// The peer address that we are handshaking with.
Expand Down Expand Up @@ -302,6 +308,7 @@ pub struct Builder<S> {
user_agent: Option<String>,
relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
}

impl<S> Builder<S>
Expand Down Expand Up @@ -361,6 +368,18 @@ where
self
}

/// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
///
/// If this is unset, the minimum accepted protocol version for peer connections is kept
/// constant over network upgrade activations.
pub fn with_best_tip_height(
mut self,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> Self {
self.best_tip_height = best_tip_height;
self
}

/// Whether to request that peers relay transactions to our node. Optional.
///
/// If this is unset, the node will not request transactions.
Expand Down Expand Up @@ -402,6 +421,7 @@ where
our_services,
relay,
parent_span: Span::current(),
best_tip_height: self.best_tip_height,
})
}
}
Expand All @@ -424,6 +444,7 @@ where
our_services: None,
relay: None,
inv_collector: None,
best_tip_height: None,
}
}
}
Expand All @@ -433,6 +454,7 @@ where
///
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
#[allow(clippy::too_many_arguments)]
pub async fn negotiate_version(
peer_conn: &mut Framed<TcpStream, Codec>,
connected_addr: &ConnectedAddr,
Expand All @@ -441,6 +463,7 @@ pub async fn negotiate_version(
user_agent: String,
our_services: PeerServices,
relay: bool,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
Expand Down Expand Up @@ -552,17 +575,11 @@ pub async fn negotiate_version(
Err(HandshakeError::NonceReuse)?;
}

// TODO: Reject connections with nodes that don't know about the current network upgrade (#1334)
// Use the latest non-finalized block height, rather than the minimum
if remote_version
< Version::min_remote_for_height(
config.network,
// This code will be replaced in #1334
constants::INITIAL_MIN_NETWORK_PROTOCOL_VERSION
.activation_height(config.network)
.expect("minimum network protocol network upgrade has an activation height"),
)
{
// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let height = best_tip_height.and_then(|height| *height.borrow());
let min_version = Version::min_remote_for_height(config.network, height);
if remote_version < min_version {
// Disconnect if peer is using an obsolete version.
Err(HandshakeError::ObsoleteVersion(remote_version))?;
}
Expand Down Expand Up @@ -617,6 +634,7 @@ where
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;
let best_tip_height = self.best_tip_height.clone();

let fut = async move {
debug!(
Expand Down Expand Up @@ -647,6 +665,7 @@ where
user_agent,
our_services,
relay,
best_tip_height,
),
)
.await??;
Expand Down
10 changes: 8 additions & 2 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tokio::{
net::TcpListener,
sync::{broadcast, watch},
time::Instant,
};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt,
Expand All @@ -25,7 +29,7 @@ use crate::{
BoxError, Config, Request, Response,
};

use zebra_chain::parameters::Network;
use zebra_chain::{block, parameters::Network};

use super::CandidateSet;
use super::PeerSet;
Expand Down Expand Up @@ -59,6 +63,7 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
pub async fn init<S>(
config: Config,
inbound_service: S,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>,
Expand Down Expand Up @@ -87,6 +92,7 @@ where
.with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string())
.with_best_tip_height(best_tip_height)
.want_transactions(true)
.finish()
.expect("configured all required parameters");
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer_set/initialize/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });

let (_peer_service, address_book) = init(config, inbound_service).await;
let (_peer_service, address_book) = init(config, inbound_service, None).await;
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();

if listen_addr.port() == 0 {
Expand Down
6 changes: 5 additions & 1 deletion zebra-network/src/protocol/external/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl Version {
/// # Panics
///
/// If we are incompatible with our own minimum remote protocol version.
pub fn min_remote_for_height(network: Network, height: block::Height) -> Version {
pub fn min_remote_for_height(
network: Network,
height: impl Into<Option<block::Height>>,
) -> Version {
let height = height.into().unwrap_or(block::Height(0));
let min_spec = Version::min_specified_for_height(network, height);

// shut down if our own version is too old
Expand Down
95 changes: 64 additions & 31 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@ use std::{
time::{Duration, Instant},
};

use check::difficulty::POW_MEDIAN_BLOCK_SPAN;
use futures::future::FutureExt;
use non_finalized_state::{NonFinalizedState, QueuedBlocks};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
#[cfg(any(test, feature = "proptest-impl"))]
use tower::buffer::Buffer;
use tower::{util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
block::{self, Block},
parameters::POW_AVERAGING_WINDOW,
parameters::{Network, NetworkUpgrade},
transaction,
transaction::Transaction,
transparent,
};

use self::best_tip_height::BestTipHeight;
use crate::{
constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config,
FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError,
};

mod best_tip_height;
pub(crate) mod check;
mod finalized_state;
mod non_finalized_state;
Expand Down Expand Up @@ -63,14 +63,21 @@ pub(crate) struct StateService {
network: Network,
/// Instant tracking the last time `pending_utxos` was pruned
last_prune: Instant,
/// The current best chain tip height.
best_tip_height: BestTipHeight,
}

impl StateService {
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);

pub fn new(config: Config, network: Network) -> Self {
pub fn new(config: Config, network: Network) -> (Self, watch::Receiver<Option<block::Height>>) {
let (mut best_tip_height, best_tip_height_receiver) = BestTipHeight::new();
let disk = FinalizedState::new(&config, network);

if let Some(finalized_height) = disk.finalized_tip_height() {
best_tip_height.set_finalized_height(finalized_height);
}

let mem = NonFinalizedState::new(network);
let queued_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();
Expand All @@ -82,6 +89,7 @@ impl StateService {
pending_utxos,
network,
last_prune: Instant::now(),
best_tip_height,
};

tracing::info!("starting legacy chain check");
Expand All @@ -108,7 +116,23 @@ impl StateService {
}
tracing::info!("no legacy chain found");

state
(state, best_tip_height_receiver)
}

/// Queue a finalized block for verification and storage in the finalized state.
fn queue_and_commit_finalized(
&mut self,
finalized: FinalizedBlock,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
let (rsp_tx, rsp_rx) = oneshot::channel();

self.disk.queue_and_commit_finalized((finalized, rsp_tx));

if let Some(finalized_height) = self.disk.finalized_tip_height() {
self.best_tip_height.set_finalized_height(finalized_height);
}

rsp_rx
}

/// Queue a non finalized block for verification and check if any queued
Expand Down Expand Up @@ -165,10 +189,17 @@ impl StateService {
);
}

self.queued_blocks
.prune_by_height(self.disk.finalized_tip_height().expect(
let finalized_tip_height = self.disk.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
));
);
let non_finalized_tip_height = self.mem.best_tip().map(|(height, _hash)| height);

self.queued_blocks.prune_by_height(finalized_tip_height);

self.best_tip_height
.set_finalized_height(finalized_tip_height);
self.best_tip_height
.set_best_non_finalized_height(non_finalized_tip_height);

tracing::trace!("finished processing queued block");
rsp_rx
Expand Down Expand Up @@ -204,23 +235,6 @@ impl StateService {
let queued_children = self.queued_blocks.dequeue_children(parent_hash);

for (child, rsp_tx) in queued_children {
// required by validate_and_commit, moved here to make testing easier
assert!(
child.height > self.network.mandatory_checkpoint_height(),
"invalid non-finalized block height: the canopy checkpoint is mandatory, \
pre-canopy blocks, and the canopy activation block, \
must be committed to the state as finalized blocks"
);

// required by check_contextual_validity, moved here to make testing easier
let relevant_chain =
self.any_ancestor_blocks(child.block.header.previous_block_hash);
assert!(
relevant_chain.len() >= POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN,
"contextual validation requires at least \
28 (POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN) blocks"
);

let child_hash = child.hash;
let result;

Expand Down Expand Up @@ -504,6 +518,17 @@ impl StateService {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_hashes(intersection, stop, max_len)
}

/// Assert some assumptions about the prepared `block` before it is validated.
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
// required by validate_and_commit, moved here to make testing easier
assert!(
block.height > self.network.mandatory_checkpoint_height(),
"invalid non-finalized block height: the canopy checkpoint is mandatory, pre-canopy \
blocks, and the canopy activation block, must be committed to the state as finalized \
blocks"
);
}
}

pub(crate) struct Iter<'a> {
Expand Down Expand Up @@ -640,6 +665,8 @@ impl Service<Request> for StateService {
Request::CommitBlock(prepared) => {
metrics::counter!("state.requests", 1, "type" => "commit_block");

self.assert_block_can_be_validated(&prepared);

self.pending_utxos
.check_against_ordered(&prepared.new_outputs);
let rsp_rx = self.queue_and_commit_non_finalized(prepared);
Expand All @@ -656,10 +683,8 @@ impl Service<Request> for StateService {
Request::CommitFinalizedBlock(finalized) => {
metrics::counter!("state.requests", 1, "type" => "commit_finalized_block");

let (rsp_tx, rsp_rx) = oneshot::channel();

self.pending_utxos.check_against(&finalized.new_outputs);
self.disk.queue_and_commit_finalized((finalized, rsp_tx));
let rsp_rx = self.queue_and_commit_finalized(finalized);

async move {
rsp_rx
Expand Down Expand Up @@ -748,16 +773,24 @@ impl Service<Request> for StateService {
/// possible to construct multiple state services in the same application (as
/// long as they, e.g., use different storage locations), but doing so is
/// probably not what you want.
pub fn init(config: Config, network: Network) -> BoxService<Request, Response, BoxError> {
BoxService::new(StateService::new(config, network))
pub fn init(
config: Config,
network: Network,
) -> (
BoxService<Request, Response, BoxError>,
watch::Receiver<Option<block::Height>>,
) {
let (state_service, best_tip_height) = StateService::new(config, network);

(BoxService::new(state_service), best_tip_height)
}

/// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot.
///
/// This can be used to create a state service for testing. See also [`init`].
#[cfg(any(test, feature = "proptest-impl"))]
pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let state_service = StateService::new(Config::ephemeral(), network);
let (state_service, _) = StateService::new(Config::ephemeral(), network);

Buffer::new(BoxService::new(state_service), 1)
}
Expand Down
Loading

0 comments on commit 4c4dbfe

Please sign in to comment.