Skip to content

Commit

Permalink
Refactor MetaAddr to enable security fixes
Browse files Browse the repository at this point in the history
Track multiple last used times for each peer:
- Add separate untrusted_last_seen, attempt, success, and failed time fields (#1868, #1876, #1848)
- Add the new fields to the peer states, so they only appear in states where they are valid
- Insert initial seed peers in the AddressBook in the correct states

Create a new MetaAddrChange type for AddressBook changes:
- Ignore invalid state changes
    - Ignore updates to the untrusted last seen time (but update the services field)
    - If we get a gossiped or alternate change for a seed peer, use the last seen and services info
    - Once a peer has responded, don't go back to the NeverResponded... states
- Update the address book metrics

- Optimise getting the next connection address from the address book
  • Loading branch information
teor2345 committed May 21, 2021
1 parent fec19b4 commit 6b37568
Show file tree
Hide file tree
Showing 10 changed files with 1,449 additions and 482 deletions.
366 changes: 229 additions & 137 deletions zebra-network/src/address_book.rs

Large diffs are not rendered by default.

1,089 changes: 932 additions & 157 deletions zebra-network/src/meta_addr.rs

Large diffs are not rendered by default.

31 changes: 0 additions & 31 deletions zebra-network/src/meta_addr/arbitrary.rs

This file was deleted.

62 changes: 37 additions & 25 deletions zebra-network/src/meta_addr/tests/check.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,57 @@
//! Shared test checks for MetaAddr
use super::super::MetaAddr;
use super::super::{MetaAddr, PeerAddrState::NeverAttemptedGossiped};

use crate::constants::TIMESTAMP_TRUNCATION_SECONDS;

/// Make sure that the sanitize function reduces time and state metadata
/// leaks.
pub(crate) fn sanitize_avoids_leaks(entry: &MetaAddr) {
let sanitized = entry.sanitize();
let sanitized = match entry.sanitize() {
Some(sanitized) => sanitized,
// Skip addresses that will never be sent to peers
None => {
return;
}
};

// We want the sanitized timestamp to:
// - be a multiple of the truncation interval,
// - have a zero nanoseconds component, and
// - be within the truncation interval of the original timestamp.
assert_eq!(
sanitized.get_last_seen().timestamp() % TIMESTAMP_TRUNCATION_SECONDS,
0
);
assert_eq!(sanitized.get_last_seen().timestamp_subsec_nanos(), 0);
// handle underflow and overflow by skipping the check
// the other check will ensure correctness
let lowest_time = entry
.get_last_seen()
.timestamp()
.checked_sub(TIMESTAMP_TRUNCATION_SECONDS);
let highest_time = entry
.get_last_seen()
.timestamp()
.checked_add(TIMESTAMP_TRUNCATION_SECONDS);
if let Some(lowest_time) = lowest_time {
assert!(sanitized.get_last_seen().timestamp() > lowest_time);
}
if let Some(highest_time) = highest_time {
assert!(sanitized.get_last_seen().timestamp() < highest_time);
if let Some(sanitized_last) = sanitized.get_last_success_or_untrusted() {
assert_eq!(sanitized_last.timestamp() % TIMESTAMP_TRUNCATION_SECONDS, 0);
assert_eq!(sanitized_last.timestamp_subsec_nanos(), 0);
// handle underflow and overflow by skipping the check
// the other check will ensure correctness
let lowest_time = entry
.get_last_success_or_untrusted()
.map(|t| t.timestamp().checked_sub(TIMESTAMP_TRUNCATION_SECONDS))
.flatten();
let highest_time = entry
.get_last_success_or_untrusted()
.map(|t| t.timestamp().checked_add(TIMESTAMP_TRUNCATION_SECONDS))
.flatten();
if let Some(lowest_time) = lowest_time {
assert!(sanitized_last.timestamp() > lowest_time);
}
if let Some(highest_time) = highest_time {
assert!(sanitized_last.timestamp() < highest_time);
}
}

// Sanitize to the the default state, even though it's not serialized
assert_eq!(sanitized.last_connection_state, Default::default());
// Sanitize to the the gossiped state, even though it's not serialized
assert!(matches!(
sanitized.last_connection_state,
NeverAttemptedGossiped { .. }
));

// We want the other fields to be unmodified
assert_eq!(sanitized.addr, entry.addr);
// Services are sanitized during parsing, so we don't need to make
// any changes in sanitize()
assert_eq!(sanitized.services, entry.services);
assert_eq!(
sanitized.get_untrusted_services(),
entry.get_untrusted_services()
);
}
8 changes: 7 additions & 1 deletion zebra-network/src/meta_addr/tests/preallocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use std::convert::TryInto;

proptest! {
/// Confirm that each MetaAddr takes exactly META_ADDR_SIZE bytes when serialized.
/// This verifies that our calculated `TrustedPreallocate::max_allocation()` is indeed an upper bound.
/// This verifies that our calculated [`TrustedPreallocate::max_allocation`] is indeed an upper bound.
#[test]
fn meta_addr_size_is_correct(addr in MetaAddr::arbitrary()) {
// TODO: make a strategy that has no None fields
prop_assume!(addr.sanitize().is_some());

let serialized = addr
.zcash_serialize_to_vec()
.expect("Serialization to vec must succeed");
Expand All @@ -23,6 +26,9 @@ proptest! {
/// 2. The largest allowed vector is small enough to fit in a legal Zcash message
#[test]
fn meta_addr_max_allocation_is_correct(addr in MetaAddr::arbitrary()) {
// TODO: make a strategy that has no None fields
prop_assume!(addr.sanitize().is_some());

let max_allocation: usize = MetaAddr::max_allocation().try_into().unwrap();
let mut smallest_disallowed_vec = Vec::with_capacity(max_allocation + 1);
for _ in 0..(MetaAddr::max_allocation() + 1) {
Expand Down
53 changes: 42 additions & 11 deletions zebra-network/src/meta_addr/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,59 @@
//! Test vectors for MetaAddr.
use super::{super::MetaAddr, check};
use super::{
super::{MetaAddr, PeerAddrState},
check,
};

use chrono::{MAX_DATETIME, MIN_DATETIME};

/// Make sure that the sanitize function handles minimum and maximum times.
#[test]
fn sanitize_extremes() {
use PeerAddrState::*;

zebra_test::init();

let min_time_entry = MetaAddr {
let min_time_untrusted = MetaAddr {
addr: "127.0.0.1:8233".parse().unwrap(),
last_connection_state: NeverAttemptedGossiped {
untrusted_last_seen: MIN_DATETIME,
untrusted_services: Default::default(),
},
};

let min_time_local = MetaAddr {
addr: "127.0.0.1:8233".parse().unwrap(),
last_connection_state: Responded {
last_attempt: MIN_DATETIME,
last_success: MIN_DATETIME,
last_failed: Some(MIN_DATETIME),
untrusted_last_seen: Some(MIN_DATETIME),
services: Default::default(),
},
};

let max_time_untrusted = MetaAddr {
addr: "127.0.0.1:8233".parse().unwrap(),
services: Default::default(),
last_seen: MIN_DATETIME,
last_connection_state: Default::default(),
last_connection_state: NeverAttemptedGossiped {
untrusted_last_seen: MAX_DATETIME,
untrusted_services: Default::default(),
},
};

let max_time_entry = MetaAddr {
let max_time_local = MetaAddr {
addr: "127.0.0.1:8233".parse().unwrap(),
services: Default::default(),
last_seen: MAX_DATETIME,
last_connection_state: Default::default(),
last_connection_state: Responded {
last_attempt: MAX_DATETIME,
last_success: MAX_DATETIME,
last_failed: Some(MAX_DATETIME),
untrusted_last_seen: Some(MAX_DATETIME),
services: Default::default(),
},
};

check::sanitize_avoids_leaks(&min_time_entry);
check::sanitize_avoids_leaks(&max_time_entry);
check::sanitize_avoids_leaks(&min_time_untrusted);
check::sanitize_avoids_leaks(&min_time_local);
check::sanitize_avoids_leaks(&max_time_untrusted);
check::sanitize_avoids_leaks(&max_time_local);
}
40 changes: 26 additions & 14 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use zebra_chain::{block, parameters::Network};

use crate::{
constants,
meta_addr::{MetaAddr, MetaAddrChange},
protocol::{
external::{types::*, Codec, InventoryHash, Message},
internal::{Request, Response},
},
types::MetaAddr,
BoxError, Config,
};

Expand All @@ -45,7 +45,7 @@ use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerEr
pub struct Handshake<S> {
config: Config,
inbound_service: S,
timestamp_collector: mpsc::Sender<MetaAddr>,
timestamp_collector: mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String,
Expand Down Expand Up @@ -296,7 +296,7 @@ impl fmt::Debug for ConnectedAddr {
pub struct Builder<S> {
config: Option<Config>,
inbound_service: Option<S>,
timestamp_collector: Option<mpsc::Sender<MetaAddr>>,
timestamp_collector: Option<mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
Expand Down Expand Up @@ -334,9 +334,12 @@ where

/// Provide a hook for timestamp collection. Optional.
///
/// This channel takes `MetaAddr`s, permanent addresses which can be used to
/// make outbound connections to peers.
pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender<MetaAddr>) -> Self {
/// This channel takes [`MetaAddrChange`]s, which contain permanent addresses
/// that be used to make outbound connections to peers.
pub fn with_timestamp_collector(
mut self,
timestamp_collector: mpsc::Sender<MetaAddrChange>,
) -> Self {
self.timestamp_collector = Some(timestamp_collector);
self
}
Expand Down Expand Up @@ -670,8 +673,8 @@ where
// `Version` messages.
let alternate_addrs = connected_addr.get_alternate_addrs(remote_canonical_addr);
for alt_addr in alternate_addrs {
let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services);
if alt_addr.is_valid_for_outbound() {
let alt_addr = MetaAddr::new_alternate(alt_addr, remote_services);
if alt_addr.is_valid_for_outbound(None) {
tracing::info!(
?alt_addr,
"sending valid alternate peer address to the address book"
Expand Down Expand Up @@ -758,7 +761,10 @@ where
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&book_addr, &remote_services))
.send(MetaAddr::update_responded(
book_addr,
remote_services,
))
.await;
}
}
Expand All @@ -772,7 +778,10 @@ where

if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&book_addr, &remote_services))
.send(MetaAddr::update_failed(
book_addr,
Some(remote_services),
))
.await;
}
}
Expand Down Expand Up @@ -882,7 +891,10 @@ where
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&book_addr, &remote_services))
.send(MetaAddr::update_shutdown(
book_addr,
Some(remote_services),
))
.await;
}
return;
Expand Down Expand Up @@ -970,7 +982,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Resu
/// `handle_heartbeat_error`.
async fn heartbeat_timeout<F, T>(
fut: F,
timestamp_collector: &mut mpsc::Sender<MetaAddr>,
timestamp_collector: &mut mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, BoxError>
Expand Down Expand Up @@ -1004,7 +1016,7 @@ where
/// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`.
async fn handle_heartbeat_error<T, E>(
result: Result<T, E>,
timestamp_collector: &mut mpsc::Sender<MetaAddr>,
timestamp_collector: &mut mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, E>
Expand All @@ -1018,7 +1030,7 @@ where

if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = timestamp_collector
.send(MetaAddr::new_errored(&book_addr, &remote_services))
.send(MetaAddr::update_failed(book_addr, Some(*remote_services)))
.await;
}
Err(err)
Expand Down
Loading

0 comments on commit 6b37568

Please sign in to comment.