From dc9adc7da0bddba97772b49b38f84d4d6a7e55f6 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Tue, 19 Dec 2023 02:19:21 +0200 Subject: [PATCH] Tighten up P2P relay and orphan management in order to perfect high-BPS block exchange (#359) * complete revalidate orphans correctly * improve test * names and comments * close the sync gap between consensus and the block orphan pool * renames and comments * orphan ibd heuristic * explain the ibd heuristic * simplify orphan output case + review comments * remove todo and explain why * log orphan-related events via the event logger * add nodnsseed cmd flag which disable DNS seeding for peers * bump version to 0.13.1 * updated sync gap comment * update fn doc * typo * fix logs --- Cargo.lock | 100 ++++----- Cargo.toml | 100 ++++----- kaspad/src/args.rs | 6 +- kaspad/src/daemon.rs | 2 +- protocol/flows/src/flow_context.rs | 255 ++++++++++++++++------ protocol/flows/src/flowcontext/orphans.rs | 106 +++++++-- protocol/flows/src/v5/blockrelay/flow.rs | 148 +++++++++---- protocol/flows/src/v5/ibd/flow.rs | 21 +- 8 files changed, 512 insertions(+), 226 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 383bd21ce5..5f88e722b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1962,7 +1962,7 @@ dependencies = [ [[package]] name = "kaspa-addresses" -version = "0.13.0" +version = "0.13.1" dependencies = [ "borsh", "criterion", @@ -1978,7 +1978,7 @@ dependencies = [ [[package]] name = "kaspa-addressmanager" -version = "0.13.0" +version = "0.13.1" dependencies = [ "borsh", "igd-next", @@ -2001,7 +2001,7 @@ dependencies = [ [[package]] name = "kaspa-bip32" -version = "0.13.0" +version = "0.13.1" dependencies = [ "bs58", "faster-hex 0.6.1", @@ -2025,7 +2025,7 @@ dependencies = [ [[package]] name = "kaspa-cli" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "borsh", @@ -2068,7 +2068,7 @@ dependencies = [ [[package]] name = "kaspa-connectionmanager" -version = "0.13.0" +version = "0.13.1" dependencies = [ "duration-string", "futures-util", @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "kaspa-consensus" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "bincode", @@ -2127,7 +2127,7 @@ dependencies = [ [[package]] name = "kaspa-consensus-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "bincode", @@ -2164,7 +2164,7 @@ dependencies = [ [[package]] name = "kaspa-consensus-notify" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "cfg-if 1.0.0", @@ -2183,7 +2183,7 @@ dependencies = [ [[package]] name = "kaspa-consensus-wasm" -version = "0.13.0" +version = "0.13.1" dependencies = [ "faster-hex 0.6.1", "js-sys", @@ -2205,7 +2205,7 @@ dependencies = [ [[package]] name = "kaspa-consensusmanager" -version = "0.13.0" +version = "0.13.1" dependencies = [ "duration-string", "futures", @@ -2223,7 +2223,7 @@ dependencies = [ [[package]] name = "kaspa-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "cfg-if 1.0.0", "ctrlc", @@ -2241,7 +2241,7 @@ dependencies = [ [[package]] name = "kaspa-daemon" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "borsh", @@ -2263,7 +2263,7 @@ dependencies = [ [[package]] name = "kaspa-database" -version = "0.13.0" +version = "0.13.1" dependencies = [ "bincode", "enum-primitive-derive", @@ -2285,7 +2285,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-client" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-stream", @@ -2313,7 +2313,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-stream", @@ -2342,7 +2342,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-server" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-stream", @@ -2377,7 +2377,7 @@ dependencies = [ [[package]] name = "kaspa-hashes" -version = "0.13.0" +version = "0.13.1" dependencies = [ "blake2b_simd", "borsh", @@ -2398,7 +2398,7 @@ dependencies = [ [[package]] name = "kaspa-index-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-trait", @@ -2417,7 +2417,7 @@ dependencies = [ [[package]] name = "kaspa-index-processor" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-trait", @@ -2445,7 +2445,7 @@ dependencies = [ [[package]] name = "kaspa-math" -version = "0.13.0" +version = "0.13.1" dependencies = [ "borsh", "criterion", @@ -2466,14 +2466,14 @@ dependencies = [ [[package]] name = "kaspa-merkle" -version = "0.13.0" +version = "0.13.1" dependencies = [ "kaspa-hashes", ] [[package]] name = "kaspa-mining" -version = "0.13.0" +version = "0.13.1" dependencies = [ "criterion", "futures-util", @@ -2499,7 +2499,7 @@ dependencies = [ [[package]] name = "kaspa-mining-errors" -version = "0.13.0" +version = "0.13.1" dependencies = [ "kaspa-consensus-core", "thiserror", @@ -2507,7 +2507,7 @@ dependencies = [ [[package]] name = "kaspa-muhash" -version = "0.13.0" +version = "0.13.1" dependencies = [ "criterion", "kaspa-hashes", @@ -2520,7 +2520,7 @@ dependencies = [ [[package]] name = "kaspa-notify" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-trait", @@ -2582,7 +2582,7 @@ dependencies = [ [[package]] name = "kaspa-p2p-flows" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "futures", @@ -2612,7 +2612,7 @@ dependencies = [ [[package]] name = "kaspa-p2p-lib" -version = "0.13.0" +version = "0.13.1" dependencies = [ "borsh", "ctrlc", @@ -2642,7 +2642,7 @@ dependencies = [ [[package]] name = "kaspa-perf-monitor" -version = "0.13.0" +version = "0.13.1" dependencies = [ "kaspa-core", "log", @@ -2654,7 +2654,7 @@ dependencies = [ [[package]] name = "kaspa-pow" -version = "0.13.0" +version = "0.13.1" dependencies = [ "criterion", "js-sys", @@ -2668,7 +2668,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-trait", @@ -2703,7 +2703,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-macros" -version = "0.13.0" +version = "0.13.1" dependencies = [ "convert_case 0.6.0", "proc-macro-error", @@ -2715,7 +2715,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-service" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "kaspa-addresses", @@ -2744,7 +2744,7 @@ dependencies = [ [[package]] name = "kaspa-testing-integration" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "bincode", @@ -2796,7 +2796,7 @@ dependencies = [ [[package]] name = "kaspa-txscript" -version = "0.13.0" +version = "0.13.1" dependencies = [ "blake2b_simd", "borsh", @@ -2822,7 +2822,7 @@ dependencies = [ [[package]] name = "kaspa-txscript-errors" -version = "0.13.0" +version = "0.13.1" dependencies = [ "secp256k1", "thiserror", @@ -2830,7 +2830,7 @@ dependencies = [ [[package]] name = "kaspa-utils" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "async-trait", @@ -2857,7 +2857,7 @@ dependencies = [ [[package]] name = "kaspa-utils-tower" -version = "0.13.0" +version = "0.13.1" dependencies = [ "cfg-if 1.0.0", "futures", @@ -2871,7 +2871,7 @@ dependencies = [ [[package]] name = "kaspa-utxoindex" -version = "0.13.0" +version = "0.13.1" dependencies = [ "futures", "kaspa-consensus", @@ -2892,7 +2892,7 @@ dependencies = [ [[package]] name = "kaspa-wallet" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-std", "async-trait", @@ -2904,7 +2904,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-cli-wasm" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "js-sys", @@ -2918,7 +2918,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-core" -version = "0.13.0" +version = "0.13.1" dependencies = [ "aes", "argon2", @@ -2984,7 +2984,7 @@ dependencies = [ [[package]] name = "kaspa-wasm" -version = "0.13.0" +version = "0.13.1" dependencies = [ "js-sys", "kaspa-addresses", @@ -3004,7 +3004,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-client" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-std", "async-trait", @@ -3034,11 +3034,11 @@ dependencies = [ [[package]] name = "kaspa-wrpc-core" -version = "0.13.0" +version = "0.13.1" [[package]] name = "kaspa-wrpc-proxy" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "clap 4.4.7", @@ -3058,7 +3058,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-server" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-trait", "borsh", @@ -3086,14 +3086,14 @@ dependencies = [ [[package]] name = "kaspa-wrpc-wasm" -version = "0.13.0" +version = "0.13.1" dependencies = [ "kaspa-wrpc-client", ] [[package]] name = "kaspad" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "clap 4.4.7", @@ -4318,7 +4318,7 @@ dependencies = [ [[package]] name = "rothschild" -version = "0.13.0" +version = "0.13.1" dependencies = [ "clap 4.4.7", "faster-hex 0.6.1", @@ -4664,7 +4664,7 @@ dependencies = [ [[package]] name = "simpa" -version = "0.13.0" +version = "0.13.1" dependencies = [ "async-channel 2.0.0", "clap 4.4.7", diff --git a/Cargo.toml b/Cargo.toml index c9e78b30c9..bf8fa34abb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ members = [ ] [workspace.package] -version = "0.13.0" +version = "0.13.1" authors = ["Kaspa developers"] license = "MIT/Apache-2.0" repository = "https://github.com/kaspanet/rusty-kaspa" @@ -72,55 +72,55 @@ include = [ ] [workspace.dependencies] -# kaspa-testing-integration = { version = "0.13.0", path = "testing/integration" } -kaspa-os = { version = "0.13.0", path = "kaspa-os" } -kaspa-daemon = { version = "0.13.0", path = "daemon" } -kaspa-addresses = { version = "0.13.0", path = "crypto/addresses" } -kaspa-addressmanager = { version = "0.13.0", path = "components/addressmanager" } -kaspa-bip32 = { version = "0.13.0", path = "wallet/bip32" } -kaspa-connectionmanager = { version = "0.13.0", path = "components/connectionmanager" } -kaspa-consensus = { version = "0.13.0", path = "consensus" } -kaspa-consensus-core = { version = "0.13.0", path = "consensus/core" } -kaspa-consensus-notify = { version = "0.13.0", path = "consensus/notify" } -kaspa-consensus-wasm = { version = "0.13.0", path = "consensus/wasm" } -kaspa-consensusmanager = { version = "0.13.0", path = "components/consensusmanager" } -kaspa-core = { version = "0.13.0", path = "core" } -kaspa-database = { version = "0.13.0", path = "database" } -kaspa-grpc-client = { version = "0.13.0", path = "rpc/grpc/client" } -kaspa-grpc-core = { version = "0.13.0", path = "rpc/grpc/core" } -kaspa-grpc-server = { version = "0.13.0", path = "rpc/grpc/server" } -kaspa-hashes = { version = "0.13.0", path = "crypto/hashes" } -kaspa-index-core = { version = "0.13.0", path = "indexes/core" } -kaspa-index-processor = { version = "0.13.0", path = "indexes/processor" } -kaspa-math = { version = "0.13.0", path = "math" } -kaspa-merkle = { version = "0.13.0", path = "crypto/merkle" } -kaspa-mining = { version = "0.13.0", path = "mining" } -kaspa-mining-errors = { version = "0.13.0", path = "mining/errors" } -kaspa-muhash = { version = "0.13.0", path = "crypto/muhash" } -kaspa-notify = { version = "0.13.0", path = "notify" } -kaspa-p2p-flows = { version = "0.13.0", path = "protocol/flows" } -kaspa-p2p-lib = { version = "0.13.0", path = "protocol/p2p" } -kaspa-pow = { version = "0.13.0", path = "consensus/pow" } -kaspa-rpc-core = { version = "0.13.0", path = "rpc/core" } -kaspa-rpc-macros = { version = "0.13.0", path = "rpc/macros" } -kaspa-rpc-service = { version = "0.13.0", path = "rpc/service" } -kaspa-txscript = { version = "0.13.0", path = "crypto/txscript" } -kaspa-txscript-errors = { version = "0.13.0", path = "crypto/txscript/errors" } -kaspa-utils = { version = "0.13.0", path = "utils" } -kaspa-utils-tower = { version = "0.13.0", path = "utils/tower" } -kaspa-utxoindex = { version = "0.13.0", path = "indexes/utxoindex" } -kaspa-wallet = { version = "0.13.0", path = "wallet/native" } -kaspa-cli = { version = "0.13.0", path = "cli" } -kaspa-wallet-cli-wasm = { version = "0.13.0", path = "wallet/wasm" } -kaspa-wallet-core = { version = "0.13.0", path = "wallet/core" } -kaspa-wasm = { version = "0.13.0", path = "wasm" } -kaspa-wrpc-core = { version = "0.13.0", path = "rpc/wrpc/core" } -kaspa-wrpc-client = { version = "0.13.0", path = "rpc/wrpc/client" } -kaspa-wrpc-proxy = { version = "0.13.0", path = "rpc/wrpc/proxy" } -kaspa-wrpc-server = { version = "0.13.0", path = "rpc/wrpc/server" } -kaspa-wrpc-wasm = { version = "0.13.0", path = "rpc/wrpc/wasm" } -kaspad = { version = "0.13.0", path = "kaspad" } -kaspa-perf-monitor = { version = "0.13.0", path = "metrics/perf_monitor" } +# kaspa-testing-integration = { version = "0.13.1", path = "testing/integration" } +kaspa-os = { version = "0.13.1", path = "kaspa-os" } +kaspa-daemon = { version = "0.13.1", path = "daemon" } +kaspa-addresses = { version = "0.13.1", path = "crypto/addresses" } +kaspa-addressmanager = { version = "0.13.1", path = "components/addressmanager" } +kaspa-bip32 = { version = "0.13.1", path = "wallet/bip32" } +kaspa-connectionmanager = { version = "0.13.1", path = "components/connectionmanager" } +kaspa-consensus = { version = "0.13.1", path = "consensus" } +kaspa-consensus-core = { version = "0.13.1", path = "consensus/core" } +kaspa-consensus-notify = { version = "0.13.1", path = "consensus/notify" } +kaspa-consensus-wasm = { version = "0.13.1", path = "consensus/wasm" } +kaspa-consensusmanager = { version = "0.13.1", path = "components/consensusmanager" } +kaspa-core = { version = "0.13.1", path = "core" } +kaspa-database = { version = "0.13.1", path = "database" } +kaspa-grpc-client = { version = "0.13.1", path = "rpc/grpc/client" } +kaspa-grpc-core = { version = "0.13.1", path = "rpc/grpc/core" } +kaspa-grpc-server = { version = "0.13.1", path = "rpc/grpc/server" } +kaspa-hashes = { version = "0.13.1", path = "crypto/hashes" } +kaspa-index-core = { version = "0.13.1", path = "indexes/core" } +kaspa-index-processor = { version = "0.13.1", path = "indexes/processor" } +kaspa-math = { version = "0.13.1", path = "math" } +kaspa-merkle = { version = "0.13.1", path = "crypto/merkle" } +kaspa-mining = { version = "0.13.1", path = "mining" } +kaspa-mining-errors = { version = "0.13.1", path = "mining/errors" } +kaspa-muhash = { version = "0.13.1", path = "crypto/muhash" } +kaspa-notify = { version = "0.13.1", path = "notify" } +kaspa-p2p-flows = { version = "0.13.1", path = "protocol/flows" } +kaspa-p2p-lib = { version = "0.13.1", path = "protocol/p2p" } +kaspa-pow = { version = "0.13.1", path = "consensus/pow" } +kaspa-rpc-core = { version = "0.13.1", path = "rpc/core" } +kaspa-rpc-macros = { version = "0.13.1", path = "rpc/macros" } +kaspa-rpc-service = { version = "0.13.1", path = "rpc/service" } +kaspa-txscript = { version = "0.13.1", path = "crypto/txscript" } +kaspa-txscript-errors = { version = "0.13.1", path = "crypto/txscript/errors" } +kaspa-utils = { version = "0.13.1", path = "utils" } +kaspa-utils-tower = { version = "0.13.1", path = "utils/tower" } +kaspa-utxoindex = { version = "0.13.1", path = "indexes/utxoindex" } +kaspa-wallet = { version = "0.13.1", path = "wallet/native" } +kaspa-cli = { version = "0.13.1", path = "cli" } +kaspa-wallet-cli-wasm = { version = "0.13.1", path = "wallet/wasm" } +kaspa-wallet-core = { version = "0.13.1", path = "wallet/core" } +kaspa-wasm = { version = "0.13.1", path = "wasm" } +kaspa-wrpc-core = { version = "0.13.1", path = "rpc/wrpc/core" } +kaspa-wrpc-client = { version = "0.13.1", path = "rpc/wrpc/client" } +kaspa-wrpc-proxy = { version = "0.13.1", path = "rpc/wrpc/proxy" } +kaspa-wrpc-server = { version = "0.13.1", path = "rpc/wrpc/server" } +kaspa-wrpc-wasm = { version = "0.13.1", path = "rpc/wrpc/wasm" } +kaspad = { version = "0.13.1", path = "kaspad" } +kaspa-perf-monitor = { version = "0.13.1", path = "metrics/perf_monitor" } # external aes = "0.8.3" diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index 46581afe30..f0491de79f 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -65,6 +65,7 @@ pub struct Args { pub prealloc_amount: u64, pub disable_upnp: bool, + pub disable_dns_seeding: bool, } impl Default for Args { @@ -111,6 +112,7 @@ impl Default for Args { prealloc_amount: 1_000_000, disable_upnp: false, + disable_dns_seeding: false, } } } @@ -317,7 +319,8 @@ pub fn cli() -> Command { .value_parser(clap::value_parser!(u64)) .help("Interval in seconds for performance metrics collection."), ) - .arg(arg!(--"disable-upnp" "Disable upnp")); + .arg(arg!(--"disable-upnp" "Disable upnp")) + .arg(arg!(--"nodnsseed" "Disable DNS seeding for peers")); #[cfg(feature = "devnet-prealloc")] let cmd = cmd @@ -377,6 +380,7 @@ pub fn parse_args() -> Args { #[cfg(feature = "devnet-prealloc")] prealloc_amount: m.get_one::("prealloc-amount").cloned().unwrap_or(defaults.prealloc_amount), disable_upnp: m.get_one::("disable-upnp").cloned().unwrap_or(defaults.disable_upnp), + disable_dns_seeding: m.get_one::("nodnsseed").cloned().unwrap_or(defaults.disable_dns_seeding), } } diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index c504320625..a5a901e78d 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -339,7 +339,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm let p2p_server_addr = args.listen.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_p2p_port()); // connect_peers means no DNS seeding and no outbound peers let outbound_target = if connect_peers.is_empty() { args.outbound_target } else { 0 }; - let dns_seeders = if connect_peers.is_empty() { config.dns_seeders } else { &[] }; + let dns_seeders = if connect_peers.is_empty() && !args.disable_dns_seeding { config.dns_seeders } else { &[] }; let grpc_server_addr = args.rpclisten.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_rpc_port()); diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 03ddcd7a08..c6806f128a 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -1,4 +1,8 @@ -use crate::flowcontext::{orphans::OrphanBlocksPool, process_queue::ProcessQueue, transactions::TransactionsSpread}; +use crate::flowcontext::{ + orphans::{OrphanBlocksPool, OrphanOutput}, + process_queue::ProcessQueue, + transactions::TransactionsSpread, +}; use crate::{v5, v6}; use async_trait::async_trait; use futures::future::join_all; @@ -34,9 +38,9 @@ use kaspa_p2p_lib::{ use kaspa_utils::iter::IterExtensions; use kaspa_utils::networking::PeerId; use parking_lot::{Mutex, RwLock}; -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::time::Instant; +use std::{collections::hash_map::Entry, fmt::Display}; use std::{ iter::once, ops::Deref, @@ -65,26 +69,35 @@ const MAX_ORPHANS_UPPER_BOUND: usize = 1024; /// The min time to wait before allowing another parallel request const REQUEST_SCOPE_WAIT_TIME: Duration = Duration::from_secs(1); +/// Represents a block event to be logged #[derive(Debug, PartialEq)] -pub enum BlockSource { - Relay, - Submit, +pub enum BlockLogEvent { + /// Accepted block via *relay* + Relay(Hash), + /// Accepted block via *submit block* + Submit(Hash), + /// Orphaned block with x missing roots + Orphaned(Hash, usize), + /// Detected a known orphan with x missing roots + OrphanRoots(Hash, usize), + /// Unorphaned x blocks with hash being a representative + Unorphaned(Hash, usize), } -pub struct AcceptedBlockLogger { +pub struct BlockEventLogger { bps: usize, - sender: UnboundedSender<(Hash, BlockSource)>, - receiver: Mutex>>, + sender: UnboundedSender, + receiver: Mutex>>, } -impl AcceptedBlockLogger { +impl BlockEventLogger { pub fn new(bps: usize) -> Self { let (sender, receiver) = unbounded_channel(); Self { bps, sender, receiver: Mutex::new(Some(receiver)) } } - pub fn log(&self, hash: Hash, source: BlockSource) { - self.sender.send((hash, source)).unwrap(); + pub fn log(&self, event: BlockLogEvent) { + self.sender.send(event).unwrap(); } /// Start the logger listener. Must be called from an async tokio context @@ -95,21 +108,114 @@ impl AcceptedBlockLogger { let chunk_stream = UnboundedReceiverStream::new(receiver).chunks_timeout(chunk_limit, Duration::from_secs(1)); tokio::pin!(chunk_stream); while let Some(chunk) = chunk_stream.next().await { - if let Some((i, h)) = - chunk.iter().filter_map(|(h, s)| if *s == BlockSource::Submit { Some(*h) } else { None }).enumerate().last() - { - let submit = i + 1; // i is the last index so i + 1 is the number of submit blocks - let relay = chunk.len() - submit; - match (submit, relay) { - (1, 0) => info!("Accepted block {} via submit block", h), - (n, 0) => info!("Accepted {} blocks ...{} via submit block", n, h), - (n, m) => info!("Accepted {} blocks ...{}, {} via relay and {} via submit block", n + m, h, m, n), + #[derive(Default)] + struct LogSummary { + // Representative + relay_rep: Option, + submit_rep: Option, + orphan_rep: Option, + unorphan_rep: Option, + // Counts + relay_count: usize, + submit_count: usize, + orphan_count: usize, + unorphan_count: usize, + orphan_roots_count: usize, + } + + struct LogHash { + op: Option, + } + + impl From> for LogHash { + fn from(op: Option) -> Self { + Self { op } } - } else { - let h = chunk.last().expect("chunk is never empty").0; - match chunk.len() { - 1 => info!("Accepted block {} via relay", h), - n => info!("Accepted {} blocks ...{} via relay", n, h), + } + + impl Display for LogHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(hash) = self.op { + hash.fmt(f) + } else { + Ok(()) + } + } + } + + impl LogSummary { + fn relay(&self) -> LogHash { + self.relay_rep.into() + } + + fn submit(&self) -> LogHash { + self.submit_rep.into() + } + + fn orphan(&self) -> LogHash { + self.orphan_rep.into() + } + + fn unorphan(&self) -> LogHash { + self.unorphan_rep.into() + } + } + + let summary = chunk.into_iter().fold(LogSummary::default(), |mut summary, ev| { + match ev { + BlockLogEvent::Relay(hash) => { + summary.relay_count += 1; + summary.relay_rep = Some(hash) + } + BlockLogEvent::Submit(hash) => { + summary.submit_count += 1; + summary.submit_rep = Some(hash) + } + BlockLogEvent::Orphaned(hash, roots_count) => { + summary.orphan_roots_count += roots_count; + summary.orphan_count += 1; + summary.orphan_rep = Some(hash) + } + BlockLogEvent::OrphanRoots(_, roots_count) => { + summary.orphan_roots_count += roots_count; + } + BlockLogEvent::Unorphaned(hash, count) => { + summary.unorphan_count += count; + summary.unorphan_rep = Some(hash) + } + } + summary + }); + + match (summary.submit_count, summary.relay_count) { + (0, 0) => {} + (1, 0) => info!("Accepted block {} via submit block", summary.submit()), + (n, 0) => info!("Accepted {} blocks ...{} via submit block", n, summary.submit()), + (0, 1) => info!("Accepted block {} via relay", summary.relay()), + (0, m) => info!("Accepted {} blocks ...{} via relay", m, summary.relay()), + (n, m) => { + info!("Accepted {} blocks ...{}, {} via relay and {} via submit block", n + m, summary.submit(), m, n) + } + } + + match (summary.unorphan_count, summary.orphan_count, summary.orphan_roots_count) { + (0, 0, 0) => {} + (1, 0, 0) => info!("Unorphaned block {}", summary.unorphan()), + (n, 0, 0) => info!("Unorphaned {} block(s) ...{}", n, summary.unorphan()), + (0, m, 0) => info!("Orphaned {} block(s) ...{}", m, summary.orphan()), + (0, m, l) => info!("Orphaned {} block(s) ...{} and queued {} missing roots", m, summary.orphan(), l), + (n, m, 0) => { + info!("Unorphaned {} block(s) ...{}, orphaned {} block(s) ...{}", n, summary.unorphan(), m, summary.orphan(),) + } + (n, m, l) => { + info!( + "Unorphaned {} block(s) ...{}, orphaned {} block(s) ...{} and queued {} missing roots", + n, + summary.unorphan(), + m, + summary.orphan(), + l + ) } } } @@ -127,7 +233,7 @@ pub struct FlowContextInner { transactions_spread: AsyncRwLock, shared_transaction_requests: Arc>>, is_ibd_running: Arc, - ibd_peer_key: Arc>>, + ibd_metadata: Arc>>, pub address_manager: Arc>, connection_manager: RwLock>>, mining_manager: MiningManagerProxy, @@ -135,10 +241,11 @@ pub struct FlowContextInner { notification_root: Arc, // Special sampling logger used only for high-bps networks where logs must be throttled - accepted_block_logger: Option, + block_event_logger: Option, // Orphan parameters orphan_resolution_range: u32, + max_orphans: usize, } #[derive(Clone)] @@ -157,6 +264,14 @@ impl Drop for IbdRunningGuard { } } +#[derive(Debug, Clone, Copy)] +struct IbdMetadata { + /// The peer from which current IBD is syncing from + peer: PeerKey, + /// The DAA score of the relay block which triggered the current IBD + daa_score: u64, +} + pub struct RequestScopeMetadata { pub timestamp: Instant, pub obtained: bool, @@ -220,15 +335,16 @@ impl FlowContext { transactions_spread: AsyncRwLock::new(TransactionsSpread::new(hub.clone())), shared_transaction_requests: Arc::new(Mutex::new(HashMap::new())), is_ibd_running: Default::default(), - ibd_peer_key: Default::default(), + ibd_metadata: Default::default(), hub, address_manager, connection_manager: Default::default(), mining_manager, tick_service, notification_root, - accepted_block_logger: if config.bps() > 1 { Some(AcceptedBlockLogger::new(config.bps() as usize)) } else { None }, + block_event_logger: if config.bps() > 1 { Some(BlockEventLogger::new(config.bps() as usize)) } else { None }, orphan_resolution_range, + max_orphans, config, }), } @@ -242,8 +358,12 @@ impl FlowContext { self.orphan_resolution_range } + pub fn max_orphans(&self) -> usize { + self.max_orphans + } + pub fn start_async_services(&self) { - if let Some(logger) = self.accepted_block_logger.as_ref() { + if let Some(logger) = self.block_event_logger.as_ref() { logger.start(); } } @@ -272,9 +392,9 @@ impl FlowContext { &self.mining_manager } - pub fn try_set_ibd_running(&self, peer_key: PeerKey) -> Option { + pub fn try_set_ibd_running(&self, peer: PeerKey, relay_daa_score: u64) -> Option { if self.is_ibd_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() { - self.ibd_peer_key.write().replace(peer_key); + self.ibd_metadata.write().replace(IbdMetadata { peer, daa_score: relay_daa_score }); Some(IbdRunningGuard { indicator: self.is_ibd_running.clone() }) } else { None @@ -285,9 +405,19 @@ impl FlowContext { self.is_ibd_running.load(Ordering::SeqCst) } + /// If IBD is running, returns the IBD peer we are syncing from pub fn ibd_peer_key(&self) -> Option { if self.is_ibd_running() { - *self.ibd_peer_key.read() + self.ibd_metadata.read().map(|md| md.peer) + } else { + None + } + } + + /// If IBD is running, returns the DAA score of the relay block which triggered it + pub fn ibd_relay_daa_score(&self) -> Option { + if self.is_ibd_running() { + self.ibd_metadata.read().map(|md| md.daa_score) } else { None } @@ -323,21 +453,17 @@ impl FlowContext { Self::try_adding_request_impl(req, &self.shared_transaction_requests) } - pub async fn add_orphan(&self, orphan_block: Block) { - if self.is_log_throttled() { - debug!("Received a block with missing parents, adding to orphan pool: {}", orphan_block.hash()); - } else { - info!("Received a block with missing parents, adding to orphan pool: {}", orphan_block.hash()); - } - self.orphans_pool.write().await.add_orphan(orphan_block) + pub async fn add_orphan(&self, consensus: &ConsensusProxy, orphan_block: Block) -> Option { + self.log_block_event(BlockLogEvent::Orphaned(orphan_block.hash(), 0)); + self.orphans_pool.write().await.add_orphan(consensus, orphan_block).await } pub async fn is_known_orphan(&self, hash: Hash) -> bool { self.orphans_pool.read().await.is_known_orphan(hash) } - pub async fn get_orphan_roots(&self, consensus: &ConsensusProxy, orphan: Hash) -> Option> { - self.orphans_pool.read().await.get_orphan_roots(consensus, orphan).await + pub async fn get_orphan_roots_if_known(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput { + self.orphans_pool.read().await.get_orphan_roots_if_known(consensus, orphan).await } pub async fn unorphan_blocks(&self, consensus: &ConsensusProxy, root: Hash) -> Vec<(Block, BlockValidationFuture)> { @@ -352,18 +478,22 @@ impl FlowContext { Err(e) => warn!("Validation failed for orphan block {}: {}", block.hash(), e), } } - match unorphaned_blocks.len() { - 0 => {} - 1 => info!("Unorphaned block {}", unorphaned_blocks[0].0.hash()), - n => match self.is_log_throttled() { - true => info!("Unorphaned {} blocks ...{}", n, unorphaned_blocks.last().unwrap().0.hash()), - false => info!("Unorphaned {} blocks: {}", n, unorphaned_blocks.iter().map(|b| b.0.hash()).reusable_format(", ")), - }, + + // Log or send to event logger + if !unorphaned_blocks.is_empty() { + if let Some(logger) = self.block_event_logger.as_ref() { + logger.log(BlockLogEvent::Unorphaned(unorphaned_blocks[0].0.hash(), unorphaned_blocks.len())); + } else { + match unorphaned_blocks.len() { + 1 => info!("Unorphaned block {}", unorphaned_blocks[0].0.hash()), + n => info!("Unorphaned {} blocks: {}", n, unorphaned_blocks.iter().map(|b| b.0.hash()).reusable_format(", ")), + } + } } unorphaned_blocks } - pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) { + pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) -> (Vec, Vec) { self.orphans_pool.write().await.revalidate_orphans(consensus).await } @@ -382,26 +512,29 @@ impl FlowContext { self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await; self.on_new_block(consensus, block, virtual_state_task).await; - self.log_block_acceptance(hash, BlockSource::Submit); + self.log_block_event(BlockLogEvent::Submit(hash)); Ok(()) } - pub fn log_block_acceptance(&self, hash: Hash, source: BlockSource) { - if let Some(logger) = self.accepted_block_logger.as_ref() { - logger.log(hash, source) + pub fn log_block_event(&self, event: BlockLogEvent) { + if let Some(logger) = self.block_event_logger.as_ref() { + logger.log(event) } else { - match source { - BlockSource::Relay => info!("Accepted block {} via relay", hash), - BlockSource::Submit => info!("Accepted block {} via submit block", hash), + match event { + BlockLogEvent::Relay(hash) => info!("Accepted block {} via relay", hash), + BlockLogEvent::Submit(hash) => info!("Accepted block {} via submit block", hash), + BlockLogEvent::Orphaned(orphan, _) => { + info!("Received a block with missing parents, adding to orphan pool: {}", orphan) + } + BlockLogEvent::OrphanRoots(orphan, roots_count) => { + info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots_count) + } + _ => {} } } } - pub fn is_log_throttled(&self) -> bool { - self.accepted_block_logger.is_some() - } - /// Updates the mempool after a new block arrival, relays newly unorphaned transactions /// and possibly rebroadcast manually added transactions when not in IBD. /// diff --git a/protocol/flows/src/flowcontext/orphans.rs b/protocol/flows/src/flowcontext/orphans.rs index ae37f50f94..6531706edd 100644 --- a/protocol/flows/src/flowcontext/orphans.rs +++ b/protocol/flows/src/flowcontext/orphans.rs @@ -8,10 +8,24 @@ use kaspa_core::debug; use kaspa_hashes::Hash; use kaspa_utils::option::OptionExtensions; use rand::Rng; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + iter::once, +}; use super::process_queue::ProcessQueue; +/// The output of an orphan pool block query +#[derive(Debug)] +pub enum OrphanOutput { + /// Block is orphan with the provided missing roots + Roots(Vec), + /// Block has no missing roots (but it might have known orphan ancestors) + NoRoots, + /// The block does not exist in the orphan pool + Unknown, +} + struct OrphanBlock { /// The actual block block: Block, @@ -42,11 +56,12 @@ impl OrphanBlocksPool { Self { orphans: IndexMap::with_capacity(max_orphans), max_orphans } } - /// Adds the provided block to the orphan pool - pub fn add_orphan(&mut self, orphan_block: Block) { + /// Adds the provided block to the orphan pool. Returns None if the block is already + /// in the pool or if the pool chose not to keep it for any reason + pub async fn add_orphan(&mut self, consensus: &ConsensusProxy, orphan_block: Block) -> Option { let orphan_hash = orphan_block.hash(); if self.orphans.contains_key(&orphan_hash) { - return; + return None; } if self.orphans.len() == self.max_orphans { debug!("Orphan blocks pool size exceeded. Evicting a random orphan block."); @@ -60,7 +75,10 @@ impl OrphanBlocksPool { entry.children.insert(orphan_hash); } } + // Insert self.orphans.insert(orphan_block.hash(), OrphanBlock::new(orphan_block, self.iterate_child_orphans(orphan_hash).collect())); + // Get roots + Some(self.get_orphan_roots(consensus, orphan_hash).await) } /// Returns whether this block is in the orphan pool. @@ -71,11 +89,15 @@ impl OrphanBlocksPool { /// Returns the orphan roots of the provided orphan. Orphan roots are ancestors of this orphan which are /// not in the orphan pool AND do not exist consensus-wise or are header-only. Given an orphan relayed by /// a peer, these blocks should be the next-in-line to be requested from that peer. - pub async fn get_orphan_roots(&self, consensus: &ConsensusProxy, orphan: Hash) -> Option> { + pub async fn get_orphan_roots_if_known(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput { if !self.orphans.contains_key(&orphan) { - return None; + return OrphanOutput::Unknown; } + self.get_orphan_roots(consensus, orphan).await + } + /// Internal get roots method. Assumes 'orphan' is within the pool + async fn get_orphan_roots(&self, consensus: &ConsensusProxy, orphan: Hash) -> OrphanOutput { let mut roots = Vec::new(); let mut queue = VecDeque::from([orphan]); let mut visited = HashSet::from([orphan]); // We avoid the custom block hasher here. See comment on `orphans` above. @@ -94,7 +116,12 @@ impl OrphanBlocksPool { } } } - Some(roots) + + if roots.is_empty() { + OrphanOutput::NoRoots + } else { + OrphanOutput::Roots(roots) + } } pub async fn unorphan_blocks( @@ -124,6 +151,7 @@ impl OrphanBlocksPool { } } } + // We deliberately want all processing tasks to be awaited out of the orphan pool lock itertools::multiunzip(processing.into_values()) } @@ -141,7 +169,8 @@ impl OrphanBlocksPool { /// This is important for the overall health of the pool and for ensuring that /// orphan blocks don't evict due to pool size limit while already processed /// blocks remain in it. Should be called following IBD. - pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) { + pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) -> (Vec, Vec) { + // First, cleanup blocks already processed by consensus let mut i = 0; while i < self.orphans.len() { if let Some((&h, _)) = self.orphans.get_index(i) { @@ -156,6 +185,41 @@ impl OrphanBlocksPool { i += 1; } } + + // Next, search for root blocks which are processable. A processable block is a block + // which all of its parents are known to consensus with valid body state + let mut roots = Vec::new(); + for block in self.orphans.values() { + let mut processable = true; + for parent in block.block.header.direct_parents().iter().copied() { + if self.orphans.contains_key(&parent) + || consensus.async_get_block_status(parent).await.is_none_or(|status| status.is_header_only()) + { + processable = false; + break; + } + } + if processable { + roots.push(block.block.clone()); + } + } + + // Now process the roots and unorphan their descendents + let mut virtual_processing_tasks = Vec::with_capacity(roots.len()); + let mut queued_hashes = Vec::with_capacity(roots.len()); + for root in roots { + let root_hash = root.hash(); + // Queue the root for processing + let BlockValidationFutures { block_task: _, virtual_state_task: root_task } = consensus.validate_and_insert_block(root); + // Queue its descendents which are processable + let (descendent_blocks, _, descendents_tasks) = self.unorphan_blocks(consensus, root_hash).await; + // Keep track of all hashes and tasks + virtual_processing_tasks.extend(once(root_task).chain(descendents_tasks)); + queued_hashes.extend(once(root_hash).chain(descendent_blocks.into_iter().map(|block| block.hash()))); + } + + // We deliberately want the processing tasks to be awaited out of the orphan pool lock + (queued_hashes, virtual_processing_tasks) } } @@ -169,6 +233,7 @@ mod tests { errors::block::BlockProcessResult, }; use kaspa_consensusmanager::{ConsensusInstance, SessionLock}; + use kaspa_core::assert_match; use parking_lot::RwLock; use std::sync::Arc; @@ -206,11 +271,13 @@ mod tests { let d = Block::from_precomputed_hash(11.into(), vec![10.into()]); let e = Block::from_precomputed_hash(12.into(), vec![10.into()]); let f = Block::from_precomputed_hash(13.into(), vec![12.into()]); + let g = Block::from_precomputed_hash(14.into(), vec![13.into()]); + let h = Block::from_precomputed_hash(15.into(), vec![14.into()]); - pool.add_orphan(c.clone()); - pool.add_orphan(d.clone()); + pool.add_orphan(&consensus, c.clone()).await.unwrap(); + pool.add_orphan(&consensus, d.clone()).await.unwrap(); - assert_eq!(pool.get_orphan_roots(&consensus, d.hash()).await.unwrap(), roots); + assert_match!(pool.get_orphan_roots_if_known(&consensus, d.hash()).await, OrphanOutput::Roots(recv_roots) if recv_roots == roots); consensus.validate_and_insert_block(a.clone()).virtual_state_task.await.unwrap(); consensus.validate_and_insert_block(b.clone()).virtual_state_task.await.unwrap(); @@ -222,17 +289,18 @@ mod tests { assert!(pool.orphans.is_empty()); // Test revalidation - pool.add_orphan(d.clone()); - pool.add_orphan(e.clone()); - pool.add_orphan(f.clone()); - assert_eq!(pool.orphans.len(), 3); + pool.add_orphan(&consensus, d.clone()).await.unwrap(); + pool.add_orphan(&consensus, e.clone()).await.unwrap(); + pool.add_orphan(&consensus, f.clone()).await.unwrap(); + pool.add_orphan(&consensus, h.clone()).await.unwrap(); + assert_eq!(pool.orphans.len(), 4); pool.revalidate_orphans(&consensus).await; - assert_eq!(pool.orphans.len(), 2); - consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap(); - consensus.validate_and_insert_block(f.clone()).virtual_state_task.await.unwrap(); + assert_eq!(pool.orphans.len(), 1); + assert!(pool.orphans.contains_key(&h.hash())); // h's parent, g, was never inserted to the pool + pool.add_orphan(&consensus, g.clone()).await.unwrap(); pool.revalidate_orphans(&consensus).await; assert!(pool.orphans.is_empty()); - drop((a, b, c, d, e, f)); + drop((a, b, c, d, e, f, g, h)); } } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 9d0698020f..3bfda200b9 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -1,6 +1,7 @@ use crate::{ - flow_context::{BlockSource, FlowContext, RequestScope}, + flow_context::{BlockLogEvent, FlowContext, RequestScope}, flow_trait::Flow, + flowcontext::orphans::OrphanOutput, }; use kaspa_consensus_core::{api::BlockValidationFutures, block::Block, blockstatus::BlockStatus, errors::block::RuleError}; use kaspa_consensusmanager::ConsensusProxy; @@ -17,13 +18,19 @@ use std::{collections::VecDeque, sync::Arc}; pub struct RelayInvMessage { hash: Hash, - is_indirect: bool, + + /// Indicates whether this inv is an orphan root of a previously relayed descendent + /// (i.e. this inv was indirectly queued) + is_orphan_root: bool, + + /// Indicates whether this inv is already known to be within orphan resolution range + known_within_range: bool, } /// Encapsulates an incoming invs route which also receives data locally pub struct TwoWayIncomingRoute { incoming_route: SharedIncomingRoute, - indirect_invs: VecDeque, + indirect_invs: VecDeque, } impl TwoWayIncomingRoute { @@ -31,17 +38,18 @@ impl TwoWayIncomingRoute { Self { incoming_route, indirect_invs: VecDeque::new() } } - pub fn enqueue_indirect_invs>(&mut self, iter: I) { - self.indirect_invs.extend(iter) + pub fn enqueue_indirect_invs>(&mut self, iter: I, known_within_range: bool) { + // All indirect invs are orphan roots; not all are known to be within orphan resolution range + self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, is_orphan_root: true, known_within_range })) } pub async fn dequeue(&mut self) -> Result { if let Some(inv) = self.indirect_invs.pop_front() { - Ok(RelayInvMessage { hash: inv, is_indirect: true }) + Ok(inv) } else { let msg = dequeue!(self.incoming_route, Payload::InvRelayBlock)?; let inv = msg.try_into()?; - Ok(RelayInvMessage { hash: inv, is_indirect: false }) + Ok(RelayInvMessage { hash: inv, is_orphan_root: false, known_within_range: false }) } } } @@ -98,9 +106,14 @@ impl HandleRelayInvsFlow { } } - if self.ctx.is_known_orphan(inv.hash).await { - self.enqueue_orphan_roots(&session, inv.hash).await; - continue; + match self.ctx.get_orphan_roots_if_known(&session, inv.hash).await { + OrphanOutput::Unknown => {} // Keep processing this inv + OrphanOutput::NoRoots => continue, // Existing orphan w/o missing roots + OrphanOutput::Roots(roots) => { + // Known orphan with roots to enqueue + self.enqueue_orphan_roots(inv.hash, roots, inv.known_within_range); + continue; + } } if self.ctx.is_ibd_running() && !session.async_is_nearly_synced().await { @@ -129,7 +142,7 @@ impl HandleRelayInvsFlow { // We do not apply the skip heuristic below if inv was queued indirectly (as an orphan root), since // that means the process started by a proper and relevant relay block - if !inv.is_indirect && !broadcast { + if !inv.is_orphan_root && !broadcast { debug!( "Relay block {} has lower blue work than virtual's merge depth root ({} <= {}), hence we are skipping it", inv.hash, block.header.blue_work, blue_work_threshold @@ -137,14 +150,25 @@ impl HandleRelayInvsFlow { continue; } - let BlockValidationFutures { block_task, virtual_state_task } = session.validate_and_insert_block(block.clone()); + let BlockValidationFutures { block_task, mut virtual_state_task } = session.validate_and_insert_block(block.clone()); match block_task.await { Ok(_) => {} Err(RuleError::MissingParents(missing_parents)) => { debug!("Block {} is orphan and has missing parents: {:?}", block.hash(), missing_parents); - self.process_orphan(&session, block, inv.is_indirect).await?; - continue; + if self.process_orphan(&session, block.clone(), inv.known_within_range).await? { + continue; + } else { + // Block is possibly not an orphan, retrying + let BlockValidationFutures { block_task: block_task_inner, virtual_state_task: virtual_state_task_inner } = + session.validate_and_insert_block(block.clone()); + virtual_state_task = virtual_state_task_inner; + match block_task_inner.await { + Ok(_) => info!("Retried orphan block {} successfully", block.hash()), + Err(RuleError::MissingParents(_)) => continue, + Err(rule_error) => return Err(rule_error.into()), + } + } } Err(rule_error) => return Err(rule_error.into()), } @@ -164,23 +188,14 @@ impl HandleRelayInvsFlow { let ctx = self.ctx.clone(); tokio::spawn(async move { ctx.on_new_block(&session, block, virtual_state_task).await; - ctx.log_block_acceptance(inv.hash, BlockSource::Relay); + ctx.log_block_event(BlockLogEvent::Relay(inv.hash)); }); } } - async fn enqueue_orphan_roots(&mut self, consensus: &ConsensusProxy, orphan: Hash) { - if let Some(roots) = self.ctx.get_orphan_roots(consensus, orphan).await { - if roots.is_empty() { - return; - } - if self.ctx.is_log_throttled() { - debug!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len()); - } else { - info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len()); - } - self.invs_route.enqueue_indirect_invs(roots) - } + fn enqueue_orphan_roots(&mut self, orphan: Hash, roots: Vec, known_within_range: bool) { + self.ctx.log_block_event(BlockLogEvent::OrphanRoots(orphan, roots.len())); + self.invs_route.enqueue_indirect_invs(roots, known_within_range) } async fn request_block( @@ -208,19 +223,45 @@ impl HandleRelayInvsFlow { } } - async fn process_orphan(&mut self, consensus: &ConsensusProxy, block: Block, is_indirect_inv: bool) -> Result<(), ProtocolError> { + /// Process the orphan block. Returns `false` if the block has no missing roots, indicating + /// a retry is recommended + async fn process_orphan( + &mut self, + consensus: &ConsensusProxy, + block: Block, + mut known_within_range: bool, + ) -> Result { // Return if the block has been orphaned from elsewhere already if self.ctx.is_known_orphan(block.hash()).await { - return Ok(()); + return Ok(true); } - // Add the block to the orphan pool if it's within orphan resolution range. - // If the block is indirect it means one of its descendants was already is resolution range, so - // we can avoid the query. - if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? { + /* We orphan a block if one of the following holds: + 1. It is known to be within orphan resolution range (no-op) + 2. It holds the IBD DAA score heuristic conditions (local op) + 3. We resolve its orphan range by interacting with the peer (peer op) + + Note that we check the conditions by the order of their cost and avoid making expensive calls if not needed. + */ + let should_orphan = known_within_range || self.check_orphan_ibd_conditions(block.header.daa_score) || { + // Inner scope to evaluate orphan resolution range and reassign the `known_within_range` variable + known_within_range = self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await?; + known_within_range + }; + + if should_orphan { let hash = block.hash(); - self.ctx.add_orphan(block).await; - self.enqueue_orphan_roots(consensus, hash).await; + match self.ctx.add_orphan(consensus, block).await { + // There is a sync gap between consensus and the orphan pool, meaning that consensus might have indicated + // that this block is orphan, but by the time it got to the orphan pool we discovered it no longer has missing roots. + // We signal this to the caller by returning false, triggering a consensus processing retry. + // Note that no roots means it is still possible there is a known orphan ancestor in the orphan pool. However + // we should still retry consensus in this case because the ancestor might have been queued to consensus + // already and consensus handles dependencies with improved (pipeline) concurrency and overlapping + Some(OrphanOutput::NoRoots) => return Ok(false), + Some(OrphanOutput::Roots(roots)) => self.enqueue_orphan_roots(hash, roots, known_within_range), + None | Some(OrphanOutput::Unknown) => {} + } } else { // Send the block to IBD flow via the dedicated job channel. If the channel has a pending job, we prefer // the block with higher blue work, since it is usually more recent @@ -229,13 +270,36 @@ impl HandleRelayInvsFlow { Err(TrySendError::Closed(_)) => return Err(ProtocolError::ConnectionClosed), // This indicates that IBD flow has exited } } - Ok(()) + Ok(true) + } + + /// Applies an heuristic to check whether we should store the orphan block in the orphan pool for IBD considerations. + /// + /// When IBD is going on it is guaranteed to sync all blocks in past(R) where R is the relay block triggering the + /// IBD. Frequently, if the IBD is short and fast enough, R will be within short distance from the syncer tips once + /// the IBD is over. However antipast(R) is usually not in orphan resolution range so these blocks will not be kept + /// leading to another IBD and so on. + /// + /// By checking whether the current orphan DAA score is within the range (R - M/10, R + M/2)** we make sure that in this + /// case we keep ~M/2 blocks in the orphan pool which are all unorphaned when IBD completes (see revalidate_orphans), + /// and the node reaches full sync state asap. We use M/10 for the lower bound since we only want to cover anticone(R) + /// in that region (which is expectedly small), whereas the M/2 upper bound is for covering the most early segment in + /// future(R). Overall we avoid keeping more than ~M/2 in order to not enter the area where blocks start getting evicted + /// from the orphan pool. + /// + /// **where R is the DAA score of R, and M is the orphans pool size limit + fn check_orphan_ibd_conditions(&self, orphan_daa_score: u64) -> bool { + if let Some(ibd_daa_score) = self.ctx.ibd_relay_daa_score() { + let max_orphans = self.ctx.max_orphans() as u64; + orphan_daa_score + max_orphans / 10 > ibd_daa_score && orphan_daa_score < ibd_daa_score + max_orphans / 2 + } else { + false + } } - /// Finds out whether the given block hash should be retrieved via the unorphaning - /// mechanism or via IBD. This method sends a BlockLocator request to the peer with - /// a limit of `ctx.orphan_resolution_range`. In the response, if we know none of the hashes, - /// we should retrieve the given block `hash` via IBD. Otherwise, via unorphaning. + /// Checks whether the given block hash is within orphan resolution range. This method sends a BlockLocator + /// request to the peer with a limit of `ctx.orphan_resolution_range`. In the response, if we know one of the + /// hashes, we should retrieve the given block via unorphaning. async fn check_orphan_resolution_range( &mut self, consensus: &ConsensusProxy, @@ -255,7 +319,9 @@ impl HandleRelayInvsFlow { // with current syncer-side implementations (in both go-kaspa and this codebase) we could query only the last one, // but we prefer not relying on such details for correctness // - // TODO: change syncer-side to only send the most early block since it's sufficient for our needs + // The current syncer-side implementation sends a full locator even though it suffices to only send the + // most early block. We keep it this way in order to allow future syncee-side implementations to do more + // with the full incremental info and because it is only a small set of hashes. for h in locator_hashes.into_iter().rev() { if consensus.async_get_block_status(h).await.is_some_and(|s| s.has_block_body()) { return Ok(true); diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 27d98ffc59..9c736fc907 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -5,7 +5,7 @@ use crate::{ Flow, }, }; -use futures::future::try_join_all; +use futures::future::{join_all, try_join_all}; use kaspa_consensus_core::{ api::BlockValidationFuture, block::Block, @@ -71,7 +71,7 @@ impl IbdFlow { async fn start_impl(&mut self) -> Result<(), ProtocolError> { while let Ok(relay_block) = self.relay_receiver.recv().await { - if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key()) { + if let Some(_guard) = self.ctx.try_set_ibd_running(self.router.key(), relay_block.header.daa_score) { info!("IBD started with peer {}", self.router); match self.ibd(relay_block).await { @@ -132,7 +132,22 @@ impl IbdFlow { self.sync_missing_block_bodies(&session, relay_block.hash()).await?; // Following IBD we revalidate orphans since many of them might have been processed during the IBD - self.ctx.revalidate_orphans(&session).await; + // or are now processable + let (queued_hashes, virtual_processing_tasks) = self.ctx.revalidate_orphans(&session).await; + let mut unorphaned_hashes = Vec::with_capacity(queued_hashes.len()); + let results = join_all(virtual_processing_tasks).await; + for (hash, result) in queued_hashes.into_iter().zip(results) { + match result { + Ok(_) => unorphaned_hashes.push(hash), + // We do not return the error and disconnect here since we don't know + // that this peer was the origin of the orphan block + Err(e) => warn!("Validation failed for orphan block {}: {}", hash, e), + } + } + match unorphaned_hashes.len() { + 0 => {} + n => info!("IBD post processing: unorphaned {} blocks ...{}", n, unorphaned_hashes.last().unwrap()), + } Ok(()) }