diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 37eba6b95c8..3e84d46a6d1 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -14,6 +14,7 @@ impl From<&net::Handshake> for mem::Handshake { sender_chain_info: x.sender_chain_info.clone(), partial_edge_info: x.partial_edge_info.clone(), owned_account: None, + signed_ip_address: None, // borsh isn't backwards compatible. Hence, no support for signing ip address } } } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 4063c40f192..ce543da61cc 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -90,6 +90,36 @@ impl std::str::FromStr for PeerAddr { } } +/// Proof that a given peer_id owns an ip address, included in Handshake message +#[derive(Clone, PartialEq, Eq, Debug, Hash)] +pub struct SignedIpAddress { + pub ip_address: std::net::IpAddr, + pub(crate) signature: near_crypto::Signature, // signature for signed ip_address +} + +impl SignedIpAddress { + pub fn new(ip_address: std::net::IpAddr, secret_key: &near_crypto::SecretKey) -> Self { + let signature = secret_key.sign(&SignedIpAddress::ip_bytes_helper(&ip_address)); + Self { ip_address: ip_address, signature: signature } + } + + pub fn verify(&self, public_key: &PublicKey) -> bool { + self.signature.verify(&self.ip_bytes(), &public_key) + } + + fn ip_bytes_helper(ip_address: &std::net::IpAddr) -> Vec { + let ip_bytes: Vec = match ip_address { + std::net::IpAddr::V4(ip) => ip.octets().to_vec(), + std::net::IpAddr::V6(ip) => ip.octets().to_vec(), + }; + return ip_bytes; + } + + fn ip_bytes(&self) -> Vec { + return SignedIpAddress::ip_bytes_helper(&self.ip_address); + } +} + /// AccountData is a piece of global state that a validator /// signs and broadcasts to the network. It is essentially /// the data that a validator wants to share with the network. @@ -307,6 +337,8 @@ pub struct Handshake { pub(crate) partial_edge_info: PartialEdgeInfo, /// Account owned by the sender. pub(crate) owned_account: Option, + /// Signed Ip Address of the sender + pub(crate) signed_ip_address: Option, } #[derive(PartialEq, Eq, Clone, Debug, strum::IntoStaticStr)] diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index c42a5cb39c5..ffa3055cfe8 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -168,6 +168,8 @@ message Handshake { PartialEdgeInfo partial_edge_info = 7; // See description of OwnedAccount. AccountKeySignedPayload owned_account = 8; // optional + // Sender's signature to verify ip address of the sender + SignedIpAddr signed_ip_addr = 9; // required, but marked as optional for protocol buffer backward compatibility } // Response to Handshake, in case the Handshake was rejected. @@ -199,6 +201,16 @@ message LastEdge { Edge edge = 1; } +message IpAddr { + // IPv4 (4 bytes) or IPv6 (16 bytes) in network byte order. + bytes ip = 1; +} + +message SignedIpAddr { + IpAddr ip_addr = 1; + Signature signature = 2; +} + message SocketAddr { // IPv4 (4 bytes) or IPv6 (16 bytes) in network byte order. bytes ip = 1; diff --git a/chain/network/src/network_protocol/proto_conv/handshake.rs b/chain/network/src/network_protocol/proto_conv/handshake.rs index 11003597f14..fb462bf2e58 100644 --- a/chain/network/src/network_protocol/proto_conv/handshake.rs +++ b/chain/network/src/network_protocol/proto_conv/handshake.rs @@ -77,6 +77,8 @@ pub enum ParseHandshakeError { PartialEdgeInfo(ParseRequiredError), #[error("owned_account {0}")] OwnedAccount(ParseSignedOwnedAccountError), + #[error("signed_ip_address {0}")] + SignedIpAddress(ParseSignedIpAddrError), } impl From<&Handshake> for proto::Handshake { @@ -90,6 +92,7 @@ impl From<&Handshake> for proto::Handshake { sender_chain_info: MF::some((&x.sender_chain_info).into()), partial_edge_info: MF::some((&x.partial_edge_info).into()), owned_account: x.owned_account.as_ref().map(Into::into).into(), + signed_ip_addr: x.signed_ip_address.as_ref().map(Into::into).into(), ..Self::default() } } @@ -120,6 +123,8 @@ impl TryFrom<&proto::Handshake> for Handshake { .map_err(Self::Error::PartialEdgeInfo)?, owned_account: try_from_optional(&p.owned_account) .map_err(Self::Error::OwnedAccount)?, + signed_ip_address: try_from_optional(&p.signed_ip_addr) + .map_err(Self::Error::SignedIpAddress)?, }) } } diff --git a/chain/network/src/network_protocol/proto_conv/net.rs b/chain/network/src/network_protocol/proto_conv/net.rs index 8fc16aef16b..344e98a853e 100644 --- a/chain/network/src/network_protocol/proto_conv/net.rs +++ b/chain/network/src/network_protocol/proto_conv/net.rs @@ -3,12 +3,76 @@ use super::*; use crate::network_protocol::proto; use crate::network_protocol::PeerAddr; -use crate::network_protocol::{Edge, PartialEdgeInfo, PeerInfo}; +use crate::network_protocol::{Edge, PartialEdgeInfo, PeerInfo, SignedIpAddress}; use borsh::{BorshDeserialize as _, BorshSerialize as _}; use near_primitives::network::AnnounceAccount; use protobuf::MessageField as MF; use std::net::{IpAddr, SocketAddr}; +//////////////////////////////////////// +// Parse std::net::IpAddr to Protocol Buffer and back +#[derive(thiserror::Error, Debug)] +pub enum ParseIpAddrError { + #[error("invalid IP")] + InvalidIP, +} + +impl From<&std::net::IpAddr> for proto::IpAddr { + fn from(x: &std::net::IpAddr) -> Self { + Self { + ip: match x { + std::net::IpAddr::V4(ip) => ip.octets().to_vec(), + std::net::IpAddr::V6(ip) => ip.octets().to_vec(), + }, + ..Self::default() + } + } +} + +impl TryFrom<&proto::IpAddr> for std::net::IpAddr { + type Error = ParseIpAddrError; + fn try_from(x: &proto::IpAddr) -> Result { + let ip = match x.ip.len() { + 4 => IpAddr::from(<[u8; 4]>::try_from(&x.ip[..]).unwrap()), + 16 => IpAddr::from(<[u8; 16]>::try_from(&x.ip[..]).unwrap()), + _ => return Err(Self::Error::InvalidIP), + }; + Ok(ip) + } +} + +//////////////////////////////////////// +// Parse SignedIpAddr to Protocol Buffer and back +#[derive(thiserror::Error, Debug)] +pub enum ParseSignedIpAddrError { + #[error("ip_addr: {0}")] + IpAddr(ParseRequiredError), + #[error("signed_owned_ip_address: {0}")] + Signature(ParseRequiredError), +} + +impl From<&SignedIpAddress> for proto::SignedIpAddr { + fn from(x: &SignedIpAddress) -> Self { + Self { + ip_addr: MF::some((&x.ip_address).into()), + signature: MF::some((&x.signature).into()), + ..Self::default() + } + } +} + +impl TryFrom<&proto::SignedIpAddr> for SignedIpAddress { + type Error = ParseSignedIpAddrError; + fn try_from(p: &proto::SignedIpAddr) -> Result { + Ok(Self { + ip_address: try_from_required(&p.ip_addr).map_err(Self::Error::IpAddr)?, + signature: try_from_required(&p.signature).map_err(Self::Error::Signature)?, + }) + } +} + +//////////////////////////////////////// + #[derive(thiserror::Error, Debug)] pub enum ParseSocketAddrError { #[error("invalid IP")] diff --git a/chain/network/src/network_protocol/testonly.rs b/chain/network/src/network_protocol/testonly.rs index 11e4560058d..6bca12596c7 100644 --- a/chain/network/src/network_protocol/testonly.rs +++ b/chain/network/src/network_protocol/testonly.rs @@ -332,23 +332,44 @@ impl Chain { } } -pub fn make_handshake(rng: &mut R, chain: &Chain) -> Handshake { - let a = make_signer(rng); - let b = make_signer(rng); - let a_id = PeerId::new(a.public_key); - let b_id = PeerId::new(b.public_key); +pub fn make_signed_ip_addr( + ip_addr: &std::net::IpAddr, + secret_key: &near_crypto::SecretKey, +) -> SignedIpAddress { + let signed_ip_address: SignedIpAddress = SignedIpAddress::new(*ip_addr, secret_key); + return signed_ip_address; +} + +pub fn make_handshake_with_ip( + rng: &mut R, + chain: &Chain, + ip_addr: Option, +) -> Handshake { + let sender = make_signer(rng); + let target = make_signer(rng); + let sender_id = PeerId::new(sender.public_key); + let target_id = PeerId::new(target.public_key); + let signed_ip_address = match ip_addr { + Some(ip) => Some(make_signed_ip_addr(&ip, &sender.secret_key)), + _ => None, + }; Handshake { protocol_version: version::PROTOCOL_VERSION, oldest_supported_version: version::PEER_MIN_ALLOWED_PROTOCOL_VERSION, - sender_peer_id: a_id, - target_peer_id: b_id, + sender_peer_id: sender_id, + target_peer_id: target_id, sender_listen_port: Some(rng.gen()), sender_chain_info: chain.get_peer_chain_info(), partial_edge_info: make_partial_edge(rng), owned_account: None, + signed_ip_address: signed_ip_address, } } +pub fn make_handshake(rng: &mut R, chain: &Chain) -> Handshake { + make_handshake_with_ip(rng, chain, None) +} + pub fn make_routed_message(rng: &mut R, body: RoutedMessageBody) -> RoutedMessageV2 { let signer = make_signer(rng); let peer_id = PeerId::new(signer.public_key); diff --git a/chain/network/src/network_protocol/tests.rs b/chain/network/src/network_protocol/tests.rs index 81e54691787..48dfaa48712 100644 --- a/chain/network/src/network_protocol/tests.rs +++ b/chain/network/src/network_protocol/tests.rs @@ -7,7 +7,10 @@ use crate::types::{PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg use anyhow::{bail, Context as _}; use itertools::Itertools as _; use near_async::time; +use near_crypto::{KeyType, SecretKey}; +use near_primitives::network::PeerId; use rand::Rng as _; +use std::net; #[test] fn deduplicate_edges() { @@ -61,8 +64,11 @@ fn serialize_deserialize_protobuf_only() { let mut rng = make_rng(39521947542); let mut clock = time::FakeClock::default(); let chain = data::Chain::make(&mut clock, &mut rng, 12); + let ip_addr: net::IpAddr = data::make_ipv4(&mut rng); let msgs = [ - PeerMessage::Tier1Handshake(data::make_handshake(&mut rng, &chain)), + PeerMessage::Tier1Handshake(data::make_handshake_with_ip(&mut rng, &chain, Some(ip_addr))), // with ip address + PeerMessage::Tier1Handshake(data::make_handshake(&mut rng, &chain)), // without ip address, temporarily supported for backwards compatibility migration + PeerMessage::Tier2Handshake(data::make_handshake_with_ip(&mut rng, &chain, Some(ip_addr))), // Tier2 Handshake does not maintain ip address with borsh serialization PeerMessage::SyncAccountsData(SyncAccountsData { accounts_data: (0..4) .map(|_| Arc::new(data::make_signed_account_data(&mut rng, &clock.clock()))) @@ -107,7 +113,7 @@ fn serialize_deserialize() -> anyhow::Result<()> { }), )); let msgs = [ - PeerMessage::Tier2Handshake(data::make_handshake(&mut rng, &chain)), + PeerMessage::Tier2Handshake(data::make_handshake(&mut rng, &chain)), // without ip address, temporarily supported for backwards compatibility migration PeerMessage::HandshakeFailure( data::make_peer_info(&mut rng), HandshakeFailureReason::InvalidTarget, @@ -170,3 +176,15 @@ fn serialize_deserialize() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn sign_and_verify_with_signed_ip_address_interface() { + let node_key = SecretKey::from_seed(KeyType::ED25519, "123"); + let peer_id = PeerId::new(node_key.public_key()); + let mut rng = make_rng(89028037453); + let ip_addr: net::IpAddr = data::make_ipv4(&mut rng); + // Wrap ip address sign and verify algorithm with interfaces SignedIpAddress + let signed_ip_address: SignedIpAddress = SignedIpAddress::new(ip_addr, &node_key); + assert!(signed_ip_address.verify(&node_key.public_key())); + assert!(signed_ip_address.verify(peer_id.public_key())); +} diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index c933cb5d009..02873bdb04d 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -2,6 +2,7 @@ use crate::accounts_data; use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; use crate::config::PEERS_RESPONSE_MAX_PEERS; +use crate::network_protocol::SignedIpAddress; use crate::network_protocol::{ Edge, EdgeState, Encoding, OwnedAccount, ParsePeerMessageError, PartialEdgeInfo, PeerChainInfoV2, PeerIdOrHash, PeerInfo, PeersRequest, PeersResponse, RawRoutedMessage, @@ -110,6 +111,8 @@ pub(crate) enum ClosingReason { TooLargeClockSkew, #[error("owned_account.peer_id doesn't match handshake.sender_peer_id")] OwnedAccountMismatch, + #[error("signed_ip_address's peer address doesn't match tcp stream's peer_addr")] + IpAddressMismatch, #[error("PeerActor stopped NOT via PeerActor::stop()")] Unknown, } @@ -132,6 +135,7 @@ impl ClosingReason { ClosingReason::TooLargeClockSkew => true, // reconnect will fail for the same reason ClosingReason::OwnedAccountMismatch => true, // misbehaving peer ClosingReason::Unknown => false, // only happens in tests + ClosingReason::IpAddressMismatch => true, // invalid ip address or signature must be banned } } } @@ -286,7 +290,7 @@ impl PeerActor { }; let my_node_info = PeerInfo { id: network_state.config.node_id(), - addr: network_state.config.node_addr.as_ref().map(|a| **a), + addr: Some(stream.local_addr), account_id: network_state.config.validator.as_ref().map(|v| v.account_id()), }; // recv is the HandshakeSignal returned by this spawn_inner() call. @@ -420,6 +424,10 @@ impl PeerActor { } else { (0, vec![]) }; + let my_signed_ip_address = SignedIpAddress::new( + self.my_node_info.addr.unwrap().ip(), + &self.network_state.config.node_key, + ); let handshake = Handshake { protocol_version: spec.protocol_version, oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, @@ -442,6 +450,7 @@ impl PeerActor { } .sign(vc.signer.as_ref()) }), + signed_ip_address: Some(my_signed_ip_address), }; let msg = match spec.tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), @@ -565,6 +574,20 @@ impl PeerActor { } } + // Verify the signed IP address is valid. + if let Some(signed_ip_address) = &handshake.signed_ip_address { + if self.peer_addr.ip() != signed_ip_address.ip_address { + self.stop(ctx, ClosingReason::IpAddressMismatch); + return; + } + // Verify signature of the sender on its ip address + if !(signed_ip_address.verify(handshake.sender_peer_id.public_key())) { + self.stop(ctx, ClosingReason::Ban(ReasonForBan::InvalidSignature)); + return; + } + } // else do nothing as temporary leniency for backward compatibility purposes + // TODO(soon): Fail the handshake if its doesn't include the required signed_ip_address after all production nodes have upgraded to latest handshake protocol + // Verify that handshake.owned_account is valid. if let Some(owned_account) = &handshake.owned_account { if let Err(_) = owned_account.payload().verify(&owned_account.account_key) { diff --git a/chain/network/src/peer/tests/communication.rs b/chain/network/src/peer/tests/communication.rs index 4e187bfaaf2..25a1e338129 100644 --- a/chain/network/src/peer/tests/communication.rs +++ b/chain/network/src/peer/tests/communication.rs @@ -1,4 +1,4 @@ -use crate::network_protocol::testonly as data; +use crate::network_protocol::testonly::{self as data, make_signed_ip_addr}; use crate::network_protocol::{ Encoding, Handshake, HandshakeFailureReason, PartialEdgeInfo, PeerMessage, PeersRequest, PeersResponse, RoutedMessageBody, @@ -199,6 +199,8 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O let inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; let outbound_port = outbound_stream.local_addr.port(); let mut outbound = Stream::new(outbound_encoding, outbound_stream); + let ip_addr = outbound.stream.local_addr.ip(); + let signed_ip_address = make_signed_ip_addr(&ip_addr, &outbound_cfg.network.node_key); // Send too old PROTOCOL_VERSION, expect ProtocolVersionMismatch let mut handshake = Handshake { @@ -210,6 +212,7 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O sender_chain_info: outbound_cfg.chain.get_peer_chain_info(), partial_edge_info: outbound_cfg.partial_edge_info(&inbound.cfg.id(), 1), owned_account: None, + signed_ip_address: Some(signed_ip_address), }; // We will also introduce chain_id mismatch, but ProtocolVersionMismatch is expected to take priority. handshake.sender_chain_info.genesis_id.chain_id = "unknown_chain".to_string(); @@ -248,6 +251,57 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O assert_matches!(resp, PeerMessage::Tier2Handshake(_)); } +async fn test_backward_compatible_handshake_without_signed_ip_address( + outbound_encoding: Option, + inbound_encoding: Option, +) { + let mut rng = make_rng(89028037453); + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, &mut rng, 12)); + + let inbound_cfg = PeerConfig { + network: chain.make_config(&mut rng), + chain: chain.clone(), + force_encoding: inbound_encoding, + }; + + let outbound_cfg = PeerConfig { + network: chain.make_config(&mut rng), + chain: chain.clone(), + force_encoding: outbound_encoding, + }; + + let (outbound_stream, inbound_stream) = + tcp::Stream::loopback(inbound_cfg.id(), tcp::Tier::T2).await; + + // Set up both inbound stream and PeerActor process for it + let inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; + + // Initialize the handshake from this unit test's thread to act as the outbound peer + let outbound_port = outbound_stream.local_addr.port(); + let mut outbound = Stream::new(outbound_encoding, outbound_stream); + let mut handshake = Handshake { + protocol_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, + oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, + sender_peer_id: outbound_cfg.id(), + target_peer_id: inbound.cfg.id(), + sender_listen_port: Some(outbound_port), + sender_chain_info: outbound_cfg.chain.get_peer_chain_info(), + partial_edge_info: outbound_cfg.partial_edge_info(&inbound.cfg.id(), 1), + owned_account: None, + signed_ip_address: None, + }; + handshake.sender_chain_info = chain.get_peer_chain_info(); + + // Send an outdated Handshake, which doesn't contain a signed ip address and verify handshake still succeeds + // TODO(soon): In future, this needs to fail after all production nodes have included mandatory signed ip address + outbound.write(&PeerMessage::Tier2Handshake(handshake.clone())).await; + + // Verify the handshake is successful from receiving a Tier2Handshake response from the inbound + let resp = outbound.read().await.unwrap(); + assert_matches!(resp, PeerMessage::Tier2Handshake(_)); +} + #[tokio::test] // Verifies that HandshakeFailures are served correctly. async fn handshake() -> anyhow::Result<()> { @@ -262,6 +316,7 @@ async fn handshake() -> anyhow::Result<()> { } } test_handshake(*outbound, *inbound).await; + test_backward_compatible_handshake_without_signed_ip_address(*outbound, *inbound).await; } } Ok(()) diff --git a/chain/network/src/peer_manager/tests/connection_pool.rs b/chain/network/src/peer_manager/tests/connection_pool.rs index 5d9f6d9f7d9..c8fa6687d92 100644 --- a/chain/network/src/peer_manager/tests/connection_pool.rs +++ b/chain/network/src/peer_manager/tests/connection_pool.rs @@ -1,4 +1,4 @@ -use crate::network_protocol::testonly as data; +use crate::network_protocol::testonly::{self as data, make_signed_ip_addr}; use crate::network_protocol::PeerMessage; use crate::network_protocol::{Encoding, Handshake, OwnedAccount, PartialEdgeInfo}; use crate::peer::peer_actor::ClosingReason; @@ -11,6 +11,7 @@ use crate::private_actix::RegisterPeerError; use crate::tcp; use crate::testonly::make_rng; use crate::testonly::stream::Stream; +use crate::types::ReasonForBan; use near_async::time; use near_o11y::testonly::init_test_logger; use near_primitives::version::PROTOCOL_VERSION; @@ -76,7 +77,7 @@ async fn loop_connection() { cfg.node_key = pm.cfg.node_key.clone(); // Starting an outbound loop connection on TIER2 should be stopped without sending the handshake. - let conn = pm.start_outbound(chain.clone(), cfg, tcp::Tier::T2).await; + let conn = pm.start_outbound(chain.clone(), cfg.clone(), tcp::Tier::T2).await; assert_eq!( ClosingReason::OutboundNotAllowed(connection::PoolError::UnexpectedLoopConnection), conn.manager_fail_handshake(&clock.clock()).await @@ -88,6 +89,8 @@ async fn loop_connection() { let port = stream.local_addr.port(); let mut events = pm.events.from_now(); let mut stream = Stream::new(Some(Encoding::Proto), stream); + let ip_addr = stream.stream.local_addr.ip(); + let signed_ip_address = make_signed_ip_addr(&ip_addr, &cfg.node_key); stream .write(&PeerMessage::Tier2Handshake(Handshake { protocol_version: PROTOCOL_VERSION, @@ -103,6 +106,7 @@ async fn loop_connection() { &pm.cfg.node_key, ), owned_account: None, + signed_ip_address: Some(signed_ip_address), })) .await; let reason = events @@ -148,6 +152,8 @@ async fn owned_account_mismatch() { let mut stream = Stream::new(Some(Encoding::Proto), stream); let cfg = chain.make_config(rng); let vc = cfg.validator.clone().unwrap(); + let ip_addr = stream.stream.local_addr.ip(); + let signed_ip_address = make_signed_ip_addr(&ip_addr, &cfg.node_key); stream .write(&PeerMessage::Tier2Handshake(Handshake { protocol_version: PROTOCOL_VERSION, @@ -171,6 +177,7 @@ async fn owned_account_mismatch() { } .sign(vc.signer.as_ref()), ), + signed_ip_address: Some(signed_ip_address), })) .await; let reason = events @@ -267,6 +274,8 @@ async fn invalid_edge() { let port = stream.local_addr.port(); let mut events = pm.events.from_now(); let mut stream = Stream::new(Some(Encoding::Proto), stream); + let ip_addr = stream.stream.local_addr.ip(); + let signed_ip_address = make_signed_ip_addr(&ip_addr, &cfg.node_key); let vc = cfg.validator.clone().unwrap(); let handshake = Handshake { protocol_version: PROTOCOL_VERSION, @@ -284,6 +293,7 @@ async fn invalid_edge() { } .sign(vc.signer.as_ref()), ), + signed_ip_address: Some(signed_ip_address), }; let handshake = match tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), @@ -310,3 +320,103 @@ async fn invalid_edge() { } } } + +async fn test_signed_ip_address( + expected_closing_reason: ClosingReason, + wrong_ip_address: &Option, + wrong_node_key: &Option, +) { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let pm = peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await; + let cfg = chain.make_config(rng); + + for tier in [tcp::Tier::T1, tcp::Tier::T2] { + let stream = tcp::Stream::connect(&pm.peer_info(), tier).await.unwrap(); + let stream_id = stream.id(); + let port = stream.local_addr.port(); + let mut events = pm.events.from_now(); + let mut stream = Stream::new(Some(Encoding::Proto), stream); + + let ip_addr = match *wrong_ip_address { + Some(wrong_ip_address) => wrong_ip_address, + None => stream.stream.local_addr.ip(), + }; + + let wrong_node_key = wrong_node_key.clone(); + let node_key = match wrong_node_key { + Some(wrong_node_key) => wrong_node_key, + None => cfg.node_key.clone(), + }; + let signed_ip_address = make_signed_ip_addr(&ip_addr, &node_key); + let vc = cfg.validator.clone().unwrap(); + + let handshake = Handshake { + protocol_version: PROTOCOL_VERSION, + oldest_supported_version: PROTOCOL_VERSION, + sender_peer_id: cfg.node_id(), + target_peer_id: pm.cfg.node_id(), + sender_listen_port: Some(port), + sender_chain_info: chain.get_peer_chain_info(), + partial_edge_info: PartialEdgeInfo::new( + &cfg.node_id(), + &pm.cfg.node_id(), + 1, + &cfg.node_key, + ), + owned_account: Some( + OwnedAccount { + account_key: vc.signer.public_key().clone(), + peer_id: cfg.node_id(), + timestamp: clock.now_utc(), + } + .sign(vc.signer.as_ref()), + ), + signed_ip_address: Some(signed_ip_address), + }; + + let handshake = match tier { + tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), + tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + }; + + stream.write(&handshake).await; + + let reason: ClosingReason = events + .recv_until(|ev| match ev { + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + Some(ev.reason) + } + Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { + panic!("PeerManager accepted the handshake") + } + _ => None, + }) + .await; + assert_eq!(expected_closing_reason, reason); + } +} + +#[tokio::test] +async fn signed_with_wrong_ip_address() { + let wrong_ip_address: std::net::IpAddr = data::make_ipv4(&mut make_rng(89028037453)); + let expected_closing_reason = ClosingReason::IpAddressMismatch; + test_signed_ip_address(expected_closing_reason, &Some(wrong_ip_address), &None).await; +} + +#[tokio::test] +async fn signed_with_wrong_key() { + let wrong_node_key = near_crypto::SecretKey::from_seed(near_crypto::KeyType::ED25519, "123"); + let expected_closing_reason = ClosingReason::Ban(ReasonForBan::InvalidSignature); + test_signed_ip_address(expected_closing_reason, &None, &Some(wrong_node_key)).await; +} diff --git a/chain/network/src/peer_manager/tests/nonce.rs b/chain/network/src/peer_manager/tests/nonce.rs index 2160e527c89..a8c1b7ce155 100644 --- a/chain/network/src/peer_manager/tests/nonce.rs +++ b/chain/network/src/peer_manager/tests/nonce.rs @@ -1,4 +1,4 @@ -use crate::network_protocol::testonly as data; +use crate::network_protocol::testonly::{self as data, make_signed_ip_addr}; use crate::network_protocol::{ Encoding, Handshake, PartialEdgeInfo, PeerMessage, EDGE_MIN_TIMESTAMP_NONCE, }; @@ -64,6 +64,8 @@ async fn test_nonces() { let mut stream = stream::Stream::new(Some(Encoding::Proto), stream); let peer_key = data::make_secret_key(rng); let peer_id = PeerId::new(peer_key.public_key()); + let ip_addr = stream.stream.local_addr.ip(); + let signed_ip_address = make_signed_ip_addr(&ip_addr, &peer_key); let handshake = PeerMessage::Tier2Handshake(Handshake { protocol_version: version::PROTOCOL_VERSION, oldest_supported_version: version::PEER_MIN_ALLOWED_PROTOCOL_VERSION, @@ -75,6 +77,7 @@ async fn test_nonces() { sender_chain_info: chain.get_peer_chain_info(), partial_edge_info: PartialEdgeInfo::new(&peer_id, &pm.cfg.node_id(), test.0, &peer_key), owned_account: None, + signed_ip_address: Some(signed_ip_address), }); stream.write(&handshake).await; if test.1 { diff --git a/chain/network/src/raw/connection.rs b/chain/network/src/raw/connection.rs index 2e1cfae2b47..ada0b8b961e 100644 --- a/chain/network/src/raw/connection.rs +++ b/chain/network/src/raw/connection.rs @@ -210,6 +210,7 @@ fn new_handshake( }, partial_edge_info: PartialEdgeInfo::new(my_peer_id, target_peer_id, nonce, secret_key), owned_account: None, + signed_ip_address: None, }) } diff --git a/chain/network/src/testonly/stream.rs b/chain/network/src/testonly/stream.rs index 0ba206eec8a..5425bc6e7df 100644 --- a/chain/network/src/testonly/stream.rs +++ b/chain/network/src/testonly/stream.rs @@ -7,7 +7,7 @@ use crate::network_protocol::{Encoding, PeerMessage}; use crate::tcp; pub struct Stream { - stream: tcp::Stream, + pub(crate) stream: tcp::Stream, force_encoding: Option, protocol_buffers_supported: bool, } diff --git a/core/chain-configs/src/genesis_config.rs b/core/chain-configs/src/genesis_config.rs index 23429e11504..b106cd2b6a3 100644 --- a/core/chain-configs/src/genesis_config.rs +++ b/core/chain-configs/src/genesis_config.rs @@ -476,18 +476,18 @@ impl Genesis { let mut json_str = String::new(); file.read_to_string(&mut json_str).map_err(|_| ValidationError::GenesisFileError { - error_message: format!("Failed to read genesis config file to string. "), + error_message: "Failed to read genesis config file to string. ".to_string(), })?; let json_str_without_comments = near_config_utils::strip_comments_from_json_str(&json_str) .map_err(|_| ValidationError::GenesisFileError { - error_message: format!("Failed to strip comments from genesis config file"), + error_message: "Failed to strip comments from genesis config file".to_string(), })?; let genesis = serde_json::from_str::(&json_str_without_comments).map_err(|_| { ValidationError::GenesisFileError { - error_message: format!("Failed to deserialize the genesis records."), + error_message: "Failed to deserialize the genesis records.".to_string(), } })?; diff --git a/core/chain-configs/src/genesis_validate.rs b/core/chain-configs/src/genesis_validate.rs index 45ae47c9aeb..e6b8cc23d79 100644 --- a/core/chain-configs/src/genesis_validate.rs +++ b/core/chain-configs/src/genesis_validate.rs @@ -81,7 +81,7 @@ impl<'a> GenesisValidator<'a> { .into_iter() .map(|account_info| { if !is_valid_staking_key(&account_info.public_key) { - let error_message = format!("validator staking key is not valid"); + let error_message = "validator staking key is not valid".to_string(); self.validation_errors.push_genesis_semantics_error(error_message); } (account_info.account_id, account_info.amount) @@ -94,7 +94,7 @@ impl<'a> GenesisValidator<'a> { } if validators.is_empty() { - let error_message = format!("No validators in genesis"); + let error_message = "No validators in genesis".to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } @@ -104,7 +104,7 @@ impl<'a> GenesisValidator<'a> { } if validators != self.staked_accounts { - let error_message = format!("Validator accounts do not match staked accounts."); + let error_message = "Validator accounts do not match staked accounts.".to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } @@ -140,25 +140,27 @@ impl<'a> GenesisValidator<'a> { if *self.genesis_config.online_max_threshold.numer() >= 10_000_000 { let error_message = - format!("online_max_threshold's numerator is too large, may lead to overflow."); + "online_max_threshold's numerator is too large, may lead to overflow.".to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } if *self.genesis_config.online_min_threshold.numer() >= 10_000_000 { let error_message = - format!("online_min_threshold's numerator is too large, may lead to overflow."); + "online_min_threshold's numerator is too large, may lead to overflow.".to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } if *self.genesis_config.online_max_threshold.denom() >= 10_000_000 { let error_message = - format!("online_max_threshold's denominator is too large, may lead to overflow."); + "online_max_threshold's denominator is too large, may lead to overflow." + .to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } if *self.genesis_config.online_min_threshold.denom() >= 10_000_000 { let error_message = - format!("online_min_threshold's denominator is too large, may lead to overflow."); + "online_min_threshold's denominator is too large, may lead to overflow." + .to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } @@ -171,7 +173,7 @@ impl<'a> GenesisValidator<'a> { } if self.genesis_config.epoch_length == 0 { - let error_message = format!("Epoch Length must be greater than 0"); + let error_message = "Epoch Length must be greater than 0".to_string(); self.validation_errors.push_genesis_semantics_error(error_message) } } diff --git a/nearcore/src/config_validate.rs b/nearcore/src/config_validate.rs index 3c1fb58d574..83cdcc10765 100644 --- a/nearcore/src/config_validate.rs +++ b/nearcore/src/config_validate.rs @@ -84,7 +84,7 @@ impl<'a> ConfigValidator<'a> { if let Some(restart_dump_for_shards) = &dump_config.restart_dump_for_shards { let unique_values: HashSet<_> = restart_dump_for_shards.iter().collect(); if unique_values.len() != restart_dump_for_shards.len() { - let error_message = format!("'config.state_sync.dump.restart_dump_for_shards' contains duplicate values."); + let error_message = "'config.state_sync.dump.restart_dump_for_shards' contains duplicate values.".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } @@ -92,13 +92,13 @@ impl<'a> ConfigValidator<'a> { match &dump_config.location { ExternalStorageLocation::S3 { bucket, region } => { if bucket.is_empty() || region.is_empty() { - let error_message = format!("'config.state_sync.dump.location.S3.bucket' and 'config.state_sync.dump.location.S3.region' need to be specified when 'config.state_sync.dump.location.S3' is present."); + let error_message = "'config.state_sync.dump.location.S3.bucket' and 'config.state_sync.dump.location.S3.region' need to be specified when 'config.state_sync.dump.location.S3' is present.".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } ExternalStorageLocation::Filesystem { root_dir } => { if root_dir.as_path() == Path::new("") { - let error_message = format!("'config.state_sync.dump.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.dump.location.Filesystem' is present."); + let error_message = "'config.state_sync.dump.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.dump.location.Filesystem' is present.".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } @@ -110,19 +110,19 @@ impl<'a> ConfigValidator<'a> { match &config.location { ExternalStorageLocation::S3 { bucket, region } => { if bucket.is_empty() || region.is_empty() { - let error_message = format!("'config.state_sync.sync.ExternalStorage.location.S3.bucket' and 'config.state_sync.sync.ExternalStorage.location.S3.region' need to be specified when 'config.state_sync.sync.ExternalStorage.location.S3' is present."); + let error_message = "'config.state_sync.sync.ExternalStorage.location.S3.bucket' and 'config.state_sync.sync.ExternalStorage.location.S3.region' need to be specified when 'config.state_sync.sync.ExternalStorage.location.S3' is present.".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } ExternalStorageLocation::Filesystem { root_dir } => { if root_dir.as_path() == Path::new("") { - let error_message = format!("'config.state_sync.sync.ExternalStorage.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.sync.ExternalStorage.location.Filesystem' is present."); + let error_message = "'config.state_sync.sync.ExternalStorage.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.sync.ExternalStorage.location.Filesystem' is present.".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } } if config.num_concurrent_requests == 0 { - let error_message = format!("'config.state_sync.sync.ExternalStorage.num_concurrent_requests' needs to be greater than 0"); + let error_message = "'config.state_sync.sync.ExternalStorage.num_concurrent_requests' needs to be greater than 0".to_string(); self.validation_errors.push_config_semantics_error(error_message); } } diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 18d769dac5c..d8ecfca1e23 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -287,9 +287,9 @@ async fn state_sync_dump( match missing_parts { Err(err) => { tracing::debug!(target: "state_sync_dump", shard_id, ?err, "get_missing_state_parts_for_epoch error"); - Err(Error::Other(format!( - "get_missing_state_parts_for_epoch failed" - ))) + Err(Error::Other( + "get_missing_state_parts_for_epoch failed".to_string(), + )) } Ok(parts_not_dumped) if parts_not_dumped.is_empty() => { Ok(Some(StateSyncDumpProgress::AllDumped {