From c88128832f95bb916c8b1b5eb45a34bf78ec998a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 16 Dec 2024 08:03:52 +0200 Subject: [PATCH] Upgrade libp2p from 0.52.4 to 0.54.1 (#6248) # Description Fixes https://github.com/paritytech/polkadot-sdk/issues/5996 https://github.com/libp2p/rust-libp2p/releases/tag/libp2p-v0.53.0 https://github.com/libp2p/rust-libp2p/blob/master/CHANGELOG.md ## Integration Nothing special is needed, just note that `yamux_window_size` is no longer applicable to libp2p (litep2p seems to still have it though). ## Review Notes There are a few simplifications and improvements done in libp2p 0.53 regarding swarm interface, I'll list a few key/applicable here. https://github.com/libp2p/rust-libp2p/pull/4788 removed `write_length_prefixed` function, so I inlined its code instead. https://github.com/libp2p/rust-libp2p/pull/4120 introduced new `libp2p::SwarmBuilder` instead of now deprecated `libp2p::swarm::SwarmBuilder`, the transition is straightforward and quite ergonomic (can be seen in tests). https://github.com/libp2p/rust-libp2p/pull/4581 is the most annoying change I have seen that basically makes many enums `#[non_exhaustive]`. I mapped some, but those that couldn't be mapped I dealt with by printing log messages once they are hit (the best solution I could come up with, at least with stable Rust). https://github.com/libp2p/rust-libp2p/issues/4306 makes connection close as soon as there are no handler using it, so I had to replace `KeepAlive::Until` with an explicit future that flips internal boolean after timeout, achieving the old behavior, though it should ideally be removed completely at some point. `yamux_window_size` is no longer used by libp2p thanks to https://github.com/libp2p/rust-libp2p/pull/4970 and generally Yamux should have a higher performance now. I have resolved and cleaned up all deprecations related to libp2p except `BandwidthSinks`. Libp2p deprecated it (though it is still present in 0.54.1, which is why I didn't handle it just yet). Ideally Substrate would finally [switch to the official Prometheus client](https://github.com/paritytech/substrate/issues/12699), in which case we'd get metrics for free. Otherwise a bit of code will need to be copy-pasted to maintain current behavior with `BandwidthSinks` gone, which I left a TODO about. The biggest change in 0.54.0 is https://github.com/libp2p/rust-libp2p/pull/4568 that changed transport APIs and enabled unconditional potential port reuse, which can lead to very confusing errors if running two Substrate nodes on the same machine without changing listening port explicitly. Overall nothing scary here, but testing is always appreciated. # Checklist * [x] My PR includes a detailed description as outlined in the "Description" and its two subsections above. * [x] My PR follows the [labeling requirements]( https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process ) of this project (at minimum one label for `T` required) * External contributors: ask maintainers to put the right label on your PR. --- Polkadot Address: 1vSxzbyz2cJREAuVWjhXUT1ds8vBzoxn2w4asNpusQKwjJd --------- Co-authored-by: Dmitry Markin --- Cargo.lock | 682 ++++++++---------- Cargo.toml | 2 +- prdoc/pr_6248.prdoc | 16 + substrate/client/network/src/behaviour.rs | 1 + substrate/client/network/src/discovery.rs | 174 +++-- substrate/client/network/src/network_state.rs | 2 +- substrate/client/network/src/peer_info.rs | 91 +-- substrate/client/network/src/protocol.rs | 37 +- .../network/src/protocol/notifications.rs | 2 +- .../src/protocol/notifications/behaviour.rs | 102 ++- .../src/protocol/notifications/handler.rs | 123 ++-- .../src/protocol/notifications/tests.rs | 324 ++++----- .../notifications/upgrade/notifications.rs | 30 +- .../client/network/src/request_responses.rs | 193 ++--- substrate/client/network/src/service.rs | 88 +-- substrate/client/network/src/transport.rs | 27 +- substrate/client/network/sync/src/engine.rs | 9 +- substrate/client/network/types/Cargo.toml | 2 +- substrate/client/telemetry/Cargo.toml | 2 +- substrate/client/telemetry/src/node.rs | 13 +- 20 files changed, 850 insertions(+), 1070 deletions(-) create mode 100644 prdoc/pr_6248.prdoc diff --git a/Cargo.lock b/Cargo.lock index 7f9f3198e570..0902fe6fcfbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,30 +841,14 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -[[package]] -name = "asn1-rs" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6fd5ddaf0351dff5b8da21b2fb4ff8e08ddd02857f0bf69c47639106c0fff0" -dependencies = [ - "asn1-rs-derive 0.4.0", - "asn1-rs-impl 0.1.0", - "displaydoc", - "nom", - "num-traits", - "rusticata-macros", - "thiserror", - "time", -] - [[package]] name = "asn1-rs" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" dependencies = [ - "asn1-rs-derive 0.5.0", - "asn1-rs-impl 0.2.0", + "asn1-rs-derive", + "asn1-rs-impl", "displaydoc", "nom", "num-traits", @@ -873,18 +857,6 @@ dependencies = [ "time", ] -[[package]] -name = "asn1-rs-derive" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c" -dependencies = [ - "proc-macro2 1.0.86", - "quote 1.0.37", - "syn 1.0.109", - "synstructure 0.12.6", -] - [[package]] name = "asn1-rs-derive" version = "0.5.0" @@ -897,17 +869,6 @@ dependencies = [ "synstructure 0.13.1", ] -[[package]] -name = "asn1-rs-impl" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed" -dependencies = [ - "proc-macro2 1.0.86", - "quote 1.0.37", - "syn 1.0.109", -] - [[package]] name = "asn1-rs-impl" version = "0.2.0" @@ -1618,6 +1579,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atomic-take" version = "1.1.0" @@ -5906,9 +5880,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.4.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "data-encoding-macro" @@ -5949,27 +5923,13 @@ dependencies = [ "zeroize", ] -[[package]] -name = "der-parser" -version = "8.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e" -dependencies = [ - "asn1-rs 0.5.2", - "displaydoc", - "nom", - "num-bigint", - "num-traits", - "rusticata-macros", -] - [[package]] name = "der-parser" version = "9.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" dependencies = [ - "asn1-rs 0.6.1", + "asn1-rs", "displaydoc", "nom", "num-bigint", @@ -6437,18 +6397,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "enum-as-inner" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.86", - "quote 1.0.37", - "syn 1.0.109", -] - [[package]] name = "enum-as-inner" version = "0.6.0" @@ -7588,7 +7536,7 @@ dependencies = [ "macro_magic", "parity-scale-codec", "pretty_assertions", - "proc-macro-warning 1.0.0", + "proc-macro-warning", "proc-macro2 1.0.86", "quote 1.0.37", "regex", @@ -7615,7 +7563,7 @@ dependencies = [ "frame-support-procedural-tools 13.0.0", "itertools 0.11.0", "macro_magic", - "proc-macro-warning 1.0.0", + "proc-macro-warning", "proc-macro2 1.0.86", "quote 1.0.37", "sp-crypto-hashing 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -7898,9 +7846,9 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.1.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0" +checksum = "91f328e7fb845fc832912fb6a34f40cf6d1888c92f974d1893a54e97b5ff542e" dependencies = [ "futures-timer", "futures-util", @@ -7981,12 +7929,13 @@ dependencies = [ [[package]] name = "futures-rustls" -version = "0.24.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" +checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" dependencies = [ "futures-io", - "rustls 0.21.7", + "rustls 0.23.18", + "rustls-pki-types", ] [[package]] @@ -8008,7 +7957,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ "gloo-timers", - "send_wrapper 0.4.0", + "send_wrapper", ] [[package]] @@ -8471,7 +8420,7 @@ dependencies = [ "async-trait", "cfg-if", "data-encoding", - "enum-as-inner 0.6.0", + "enum-as-inner", "futures-channel", "futures-io", "futures-util", @@ -8479,6 +8428,7 @@ dependencies = [ "ipnet", "once_cell", "rand", + "socket2 0.5.7", "thiserror", "tinyvec", "tokio", @@ -8813,17 +8763,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.4.0" @@ -9826,9 +9765,31 @@ dependencies = [ "futures-timer", "getrandom", "instant", - "libp2p-allow-block-list", - "libp2p-connection-limits", - "libp2p-core", + "libp2p-allow-block-list 0.2.0", + "libp2p-connection-limits 0.2.1", + "libp2p-core 0.40.1", + "libp2p-identity", + "libp2p-swarm 0.43.7", + "multiaddr 0.18.1", + "pin-project", + "rw-stream-sink", + "thiserror", +] + +[[package]] +name = "libp2p" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbe80f9c7e00526cd6b838075b9c171919404a4732cb2fa8ece0a093223bfc4" +dependencies = [ + "bytes", + "either", + "futures", + "futures-timer", + "getrandom", + "libp2p-allow-block-list 0.4.0", + "libp2p-connection-limits 0.4.0", + "libp2p-core 0.42.0", "libp2p-dns", "libp2p-identify", "libp2p-identity", @@ -9839,10 +9800,9 @@ dependencies = [ "libp2p-ping", "libp2p-quic", "libp2p-request-response", - "libp2p-swarm", + "libp2p-swarm 0.45.1", "libp2p-tcp", "libp2p-upnp", - "libp2p-wasm-ext", "libp2p-websocket", "libp2p-yamux", "multiaddr 0.18.1", @@ -9857,9 +9817,21 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55b46558c5c0bf99d3e2a1a38fd54ff5476ca66dd1737b12466a1824dd219311" dependencies = [ - "libp2p-core", + "libp2p-core 0.40.1", "libp2p-identity", - "libp2p-swarm", + "libp2p-swarm 0.43.7", + "void", +] + +[[package]] +name = "libp2p-allow-block-list" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1027ccf8d70320ed77e984f273bc8ce952f623762cb9bf2d126df73caef8041" +dependencies = [ + "libp2p-core 0.42.0", + "libp2p-identity", + "libp2p-swarm 0.45.1", "void", ] @@ -9869,9 +9841,21 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f5107ad45cb20b2f6c3628c7b6014b996fcb13a88053f4569c872c6e30abf58" dependencies = [ - "libp2p-core", + "libp2p-core 0.40.1", + "libp2p-identity", + "libp2p-swarm 0.43.7", + "void", +] + +[[package]] +name = "libp2p-connection-limits" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d003540ee8baef0d254f7b6bfd79bac3ddf774662ca0abf69186d517ef82ad8" +dependencies = [ + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", + "libp2p-swarm 0.45.1", "void", ] @@ -9903,42 +9887,70 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-core" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a61f26c83ed111104cd820fe9bc3aaabbac5f1652a1d213ed6e900b7918a1298" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-identity", + "multiaddr 0.18.1", + "multihash 0.19.1", + "multistream-select", + "once_cell", + "parking_lot 0.12.3", + "pin-project", + "quick-protobuf 0.8.1", + "rand", + "rw-stream-sink", + "smallvec", + "thiserror", + "tracing", + "unsigned-varint 0.8.0", + "void", + "web-time", +] + [[package]] name = "libp2p-dns" -version = "0.40.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a18db73084b4da2871438f6239fef35190b05023de7656e877c18a00541a3b" +checksum = "97f37f30d5c7275db282ecd86e54f29dd2176bd3ac656f06abf43bedb21eb8bd" dependencies = [ "async-trait", "futures", - "libp2p-core", + "hickory-resolver", + "libp2p-core 0.42.0", "libp2p-identity", - "log", "parking_lot 0.12.3", "smallvec", - "trust-dns-resolver", + "tracing", ] [[package]] name = "libp2p-identify" -version = "0.43.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a96638a0a176bec0a4bcaebc1afa8cf909b114477209d7456ade52c61cd9cd" +checksum = "1711b004a273be4f30202778856368683bd9a83c4c7dcc8f848847606831a4e3" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "either", "futures", "futures-bounded", "futures-timer", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", - "log", + "libp2p-swarm 0.45.1", "lru 0.12.3", "quick-protobuf 0.8.1", "quick-protobuf-codec", "smallvec", "thiserror", + "tracing", "void", ] @@ -9962,83 +9974,84 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.6" +version = "0.46.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16ea178dabba6dde6ffc260a8e0452ccdc8f79becf544946692fff9d412fc29d" +checksum = "ced237d0bd84bbebb7c2cad4c073160dacb4fe40534963c32ed6d4c6bb7702a3" dependencies = [ "arrayvec 0.7.4", - "asynchronous-codec", + "asynchronous-codec 0.7.0", "bytes", "either", "fnv", "futures", + "futures-bounded", "futures-timer", - "instant", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", - "log", + "libp2p-swarm 0.45.1", "quick-protobuf 0.8.1", "quick-protobuf-codec", "rand", "sha2 0.10.8", "smallvec", "thiserror", + "tracing", "uint 0.9.5", - "unsigned-varint 0.7.2", "void", + "web-time", ] [[package]] name = "libp2p-mdns" -version = "0.44.0" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a2567c305232f5ef54185e9604579a894fd0674819402bb0ac0246da82f52a" +checksum = "14b8546b6644032565eb29046b42744aee1e9f261ed99671b2c93fb140dba417" dependencies = [ "data-encoding", "futures", + "hickory-proto", "if-watch", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", - "log", + "libp2p-swarm 0.45.1", "rand", "smallvec", "socket2 0.5.7", "tokio", - "trust-dns-proto 0.22.0", + "tracing", "void", ] [[package]] name = "libp2p-metrics" -version = "0.13.1" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239ba7d28f8d0b5d77760dc6619c05c7e88e74ec8fbbe97f856f20a56745e620" +checksum = "77ebafa94a717c8442d8db8d3ae5d1c6a15e30f2d347e0cd31d057ca72e42566" dependencies = [ - "instant", - "libp2p-core", + "futures", + "libp2p-core 0.42.0", "libp2p-identify", "libp2p-identity", "libp2p-kad", "libp2p-ping", - "libp2p-swarm", - "once_cell", + "libp2p-swarm 0.45.1", + "pin-project", "prometheus-client", + "web-time", ] [[package]] name = "libp2p-noise" -version = "0.43.2" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2eeec39ad3ad0677551907dd304b2f13f17208ccebe333bef194076cd2e8921" +checksum = "36b137cb1ae86ee39f8e5d6245a296518912014eaa87427d24e6ff58cfc1b28c" dependencies = [ + "asynchronous-codec 0.7.0", "bytes", "curve25519-dalek 4.1.3", "futures", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "log", "multiaddr 0.18.1", "multihash 0.19.1", "once_cell", @@ -10048,68 +10061,71 @@ dependencies = [ "snow", "static_assertions", "thiserror", + "tracing", "x25519-dalek", "zeroize", ] [[package]] name = "libp2p-ping" -version = "0.43.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e702d75cd0827dfa15f8fd92d15b9932abe38d10d21f47c50438c71dd1b5dae3" +checksum = "005a34420359223b974ee344457095f027e51346e992d1e0dcd35173f4cdd422" dependencies = [ "either", "futures", "futures-timer", - "instant", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", - "log", + "libp2p-swarm 0.45.1", "rand", + "tracing", "void", + "web-time", ] [[package]] name = "libp2p-quic" -version = "0.9.3" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130d451d83f21b81eb7b35b360bc7972aeafb15177784adc56528db082e6b927" +checksum = "46352ac5cd040c70e88e7ff8257a2ae2f891a4076abad2c439584a31c15fd24e" dependencies = [ "bytes", "futures", "futures-timer", "if-watch", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", "libp2p-tls", - "log", "parking_lot 0.12.3", - "quinn 0.10.2", + "quinn", "rand", - "ring 0.16.20", - "rustls 0.21.7", + "ring 0.17.8", + "rustls 0.23.18", "socket2 0.5.7", "thiserror", "tokio", + "tracing", ] [[package]] name = "libp2p-request-response" -version = "0.25.3" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e3b4d67870478db72bac87bfc260ee6641d0734e0e3e275798f089c3fecfd4" +checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" dependencies = [ "async-trait", "futures", - "instant", - "libp2p-core", + "futures-bounded", + "futures-timer", + "libp2p-core 0.42.0", "libp2p-identity", - "libp2p-swarm", - "log", + "libp2p-swarm 0.45.1", "rand", "smallvec", + "tracing", "void", + "web-time", ] [[package]] @@ -10123,26 +10139,47 @@ dependencies = [ "futures", "futures-timer", "instant", - "libp2p-core", + "libp2p-core 0.40.1", "libp2p-identity", - "libp2p-swarm-derive", "log", "multistream-select", "once_cell", "rand", "smallvec", + "void", +] + +[[package]] +name = "libp2p-swarm" +version = "0.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7dd6741793d2c1fb2088f67f82cf07261f25272ebe3c0b0c311e0c6b50e851a" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-core 0.42.0", + "libp2p-identity", + "libp2p-swarm-derive", + "lru 0.12.3", + "multistream-select", + "once_cell", + "rand", + "smallvec", "tokio", + "tracing", "void", + "web-time", ] [[package]] name = "libp2p-swarm-derive" -version = "0.33.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4d5ec2a3df00c7836d7696c136274c9c59705bac69133253696a6c932cd1d74" +checksum = "206e0aa0ebe004d778d79fb0966aa0de996c19894e2c0605ba2f8524dd4443d8" dependencies = [ - "heck 0.4.1", - "proc-macro-warning 0.4.2", + "heck 0.5.0", "proc-macro2 1.0.86", "quote 1.0.37", "syn 2.0.87", @@ -10150,102 +10187,90 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.40.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b558dd40d1bcd1aaaed9de898e9ec6a436019ecc2420dd0016e712fbb61c5508" +checksum = "ad964f312c59dcfcac840acd8c555de8403e295d39edf96f5240048b5fcaa314" dependencies = [ "futures", "futures-timer", "if-watch", "libc", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "log", "socket2 0.5.7", "tokio", + "tracing", ] [[package]] name = "libp2p-tls" -version = "0.2.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8218d1d5482b122ccae396bbf38abdcb283ecc96fa54760e1dfd251f0546ac61" +checksum = "47b23dddc2b9c355f73c1e36eb0c3ae86f7dc964a3715f0731cfad352db4d847" dependencies = [ "futures", "futures-rustls", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "rcgen", - "ring 0.16.20", - "rustls 0.21.7", + "rcgen 0.11.3", + "ring 0.17.8", + "rustls 0.23.18", "rustls-webpki 0.101.4", "thiserror", - "x509-parser 0.15.1", + "x509-parser", "yasna", ] [[package]] name = "libp2p-upnp" -version = "0.1.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82775a47b34f10f787ad3e2a22e2c1541e6ebef4fe9f28f3ac553921554c94c1" +checksum = "01bf2d1b772bd3abca049214a3304615e6a36fa6ffc742bdd1ba774486200b8f" dependencies = [ "futures", "futures-timer", "igd-next", - "libp2p-core", - "libp2p-swarm", - "log", + "libp2p-core 0.42.0", + "libp2p-swarm 0.45.1", "tokio", + "tracing", "void", ] -[[package]] -name = "libp2p-wasm-ext" -version = "0.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e5d8e3a9e07da0ef5b55a9f26c009c8fb3c725d492d8bb4b431715786eea79c" -dependencies = [ - "futures", - "js-sys", - "libp2p-core", - "send_wrapper 0.6.0", - "wasm-bindgen", - "wasm-bindgen-futures", -] - [[package]] name = "libp2p-websocket" -version = "0.42.2" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "004ee9c4a4631435169aee6aad2f62e3984dc031c43b6d29731e8e82a016c538" +checksum = "888b2ff2e5d8dcef97283daab35ad1043d18952b65e05279eecbe02af4c6e347" dependencies = [ "either", "futures", "futures-rustls", - "libp2p-core", + "libp2p-core 0.42.0", "libp2p-identity", - "log", "parking_lot 0.12.3", "pin-project-lite", "rw-stream-sink", "soketto 0.8.0", "thiserror", + "tracing", "url", "webpki-roots 0.25.2", ] [[package]] name = "libp2p-yamux" -version = "0.44.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eedcb62824c4300efb9cfd4e2a6edaf3ca097b9e68b36dabe45a44469fd6a85" +checksum = "788b61c80789dba9760d8c669a5bedb642c8267555c803fabd8396e4ca5c5882" dependencies = [ + "either", "futures", - "libp2p-core", - "log", + "libp2p-core 0.42.0", "thiserror", - "yamux", + "tracing", + "yamux 0.12.1", + "yamux 0.13.3", ] [[package]] @@ -10431,7 +10456,7 @@ dependencies = [ "prost 0.12.6", "prost-build", "rand", - "rcgen", + "rcgen 0.10.0", "ring 0.16.20", "rustls 0.20.9", "serde", @@ -10451,7 +10476,7 @@ dependencies = [ "unsigned-varint 0.8.0", "url", "x25519-dalek", - "x509-parser 0.16.0", + "x509-parser", "yasna", "zeroize", ] @@ -10644,12 +10669,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "matrixmultiply" version = "0.3.7" @@ -11652,22 +11671,13 @@ dependencies = [ "memchr", ] -[[package]] -name = "oid-registry" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bedf36ffb6ba96c2eb7144ef6270557b52e54b20c0a8e1eb2ff99a6c6959bff" -dependencies = [ - "asn1-rs 0.5.2", -] - [[package]] name = "oid-registry" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d" dependencies = [ - "asn1-rs 0.6.1", + "asn1-rs", ] [[package]] @@ -20534,17 +20544,6 @@ version = "0.5.20+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" -[[package]] -name = "proc-macro-warning" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1eaa7fa0aa1929ffdf7eeb6eac234dde6268914a14ad44d23521ab6a9b258e" -dependencies = [ - "proc-macro2 1.0.86", - "quote 1.0.37", - "syn 2.0.87", -] - [[package]] name = "proc-macro-warning" version = "1.0.0" @@ -20616,9 +20615,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.21.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" dependencies = [ "dtoa", "itoa", @@ -20849,15 +20848,15 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" -version = "0.2.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ededb1cd78531627244d51dd0c7139fbe736c7d57af0092a76f0ffb2f56e98" +checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "bytes", "quick-protobuf 0.8.1", "thiserror", - "unsigned-varint 0.7.2", + "unsigned-varint 0.8.0", ] [[package]] @@ -20882,24 +20881,6 @@ dependencies = [ "rand", ] -[[package]] -name = "quinn" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" -dependencies = [ - "bytes", - "futures-io", - "pin-project-lite", - "quinn-proto 0.10.6", - "quinn-udp 0.4.1", - "rustc-hash 1.1.0", - "rustls 0.21.7", - "thiserror", - "tokio", - "tracing", -] - [[package]] name = "quinn" version = "0.11.5" @@ -20907,9 +20888,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" dependencies = [ "bytes", + "futures-io", "pin-project-lite", - "quinn-proto 0.11.8", - "quinn-udp 0.5.4", + "quinn-proto", + "quinn-udp", "rustc-hash 2.0.0", "rustls 0.23.18", "socket2 0.5.7", @@ -20918,23 +20900,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "quinn-proto" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a" -dependencies = [ - "bytes", - "rand", - "ring 0.16.20", - "rustc-hash 1.1.0", - "rustls 0.21.7", - "slab", - "thiserror", - "tinyvec", - "tracing", -] - [[package]] name = "quinn-proto" version = "0.11.8" @@ -20943,7 +20908,7 @@ checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" dependencies = [ "bytes", "rand", - "ring 0.17.7", + "ring 0.17.8", "rustc-hash 2.0.0", "rustls 0.23.18", "slab", @@ -20952,19 +20917,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "quinn-udp" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" -dependencies = [ - "bytes", - "libc", - "socket2 0.5.7", - "tracing", - "windows-sys 0.48.0", -] - [[package]] name = "quinn-udp" version = "0.5.4" @@ -21134,6 +21086,18 @@ dependencies = [ "yasna", ] +[[package]] +name = "rcgen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" +dependencies = [ + "pem 3.0.4", + "ring 0.16.20", + "time", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -21432,7 +21396,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "quinn 0.11.5", + "quinn", "rustls 0.23.18", "rustls-pemfile 2.0.0", "rustls-pki-types", @@ -21505,16 +21469,17 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.7" +version = "0.17.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", + "cfg-if", "getrandom", "libc", "spin 0.9.8", "untrusted 0.9.0", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -22028,7 +21993,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", - "ring 0.17.7", + "ring 0.17.8", "rustls-pki-types", "rustls-webpki 0.102.8", "subtle 2.5.0", @@ -22043,7 +22008,7 @@ checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", - "ring 0.17.7", + "ring 0.17.8", "rustls-pki-types", "rustls-webpki 0.102.8", "subtle 2.5.0", @@ -22156,7 +22121,7 @@ version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ - "ring 0.17.7", + "ring 0.17.8", "rustls-pki-types", "untrusted 0.9.0", ] @@ -23165,7 +23130,7 @@ dependencies = [ "assert_matches", "async-channel 1.9.0", "async-trait", - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", "cid 0.9.0", "criterion", @@ -23174,7 +23139,7 @@ dependencies = [ "futures", "futures-timer", "ip_network", - "libp2p", + "libp2p 0.54.1", "linked_hash_set", "litep2p", "log", @@ -23350,7 +23315,7 @@ dependencies = [ "async-trait", "futures", "futures-timer", - "libp2p", + "libp2p 0.54.1", "log", "parking_lot 0.12.3", "rand", @@ -23808,7 +23773,7 @@ version = "15.0.0" dependencies = [ "chrono", "futures", - "libp2p", + "libp2p 0.54.1", "log", "parking_lot 0.12.3", "pin-project", @@ -24358,12 +24323,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" -[[package]] -name = "send_wrapper" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" - [[package]] name = "separator" version = "0.4.1" @@ -24967,7 +24926,7 @@ dependencies = [ "chacha20poly1305", "curve25519-dalek 4.1.3", "rand_core 0.6.4", - "ring 0.17.7", + "ring 0.17.8", "rustc_version 0.4.0", "sha2 0.10.8", "subtle 2.5.0", @@ -27987,7 +27946,7 @@ name = "sp-version-proc-macro" version = "13.0.0" dependencies = [ "parity-scale-codec", - "proc-macro-warning 1.0.0", + "proc-macro-warning", "proc-macro2 1.0.86", "quote 1.0.37", "sp-version 29.0.0", @@ -30266,78 +30225,6 @@ dependencies = [ "keccak-hasher", ] -[[package]] -name = "trust-dns-proto" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.5.1", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.2.3", - "ipnet", - "lazy_static", - "rand", - "smallvec", - "socket2 0.4.9", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-proto" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3119112651c157f4488931a01e586aa459736e9d6046d3bd9105ffb69352d374" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.6.0", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.4.0", - "ipnet", - "once_cell", - "rand", - "smallvec", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-resolver" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a3e6c3aff1718b3c73e395d1f35202ba2ffa847c6a62eea0db8fb4cfe30be6" -dependencies = [ - "cfg-if", - "futures-util", - "ipconfig", - "lru-cache", - "once_cell", - "parking_lot 0.12.3", - "rand", - "resolv-conf", - "smallvec", - "thiserror", - "tokio", - "tracing", - "trust-dns-proto 0.23.2", -] - [[package]] name = "try-lock" version = "0.2.4" @@ -30547,7 +30434,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes", "futures-io", "futures-util", @@ -31296,7 +31183,7 @@ version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring 0.17.7", + "ring 0.17.8", "untrusted 0.9.0", ] @@ -31862,35 +31749,18 @@ dependencies = [ "zeroize", ] -[[package]] -name = "x509-parser" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7069fba5b66b9193bd2c5d3d4ff12b839118f6bcbef5328efafafb5395cf63da" -dependencies = [ - "asn1-rs 0.5.2", - "data-encoding", - "der-parser 8.2.0", - "lazy_static", - "nom", - "oid-registry 0.6.1", - "rusticata-macros", - "thiserror", - "time", -] - [[package]] name = "x509-parser" version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" dependencies = [ - "asn1-rs 0.6.1", + "asn1-rs", "data-encoding", - "der-parser 9.0.0", + "der-parser", "lazy_static", "nom", - "oid-registry 0.7.0", + "oid-registry", "rusticata-macros", "thiserror", "time", @@ -32180,6 +32050,22 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "yamux" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31b5e376a8b012bee9c423acdbb835fc34d45001cfa3106236a624e4b738028" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand", + "static_assertions", + "web-time", +] + [[package]] name = "yansi" version = "0.5.1" @@ -32288,7 +32174,7 @@ dependencies = [ "futures", "glob-match", "hex", - "libp2p", + "libp2p 0.52.4", "libsecp256k1", "multiaddr 0.18.1", "rand", diff --git a/Cargo.toml b/Cargo.toml index 62a5ada6cd41..37765056196e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -841,7 +841,7 @@ kvdb-shared-tests = { version = "0.11.0" } landlock = { version = "0.3.0" } libc = { version = "0.2.155" } libfuzzer-sys = { version = "0.4" } -libp2p = { version = "0.52.4" } +libp2p = { version = "0.54.1" } libp2p-identity = { version = "0.2.9" } libsecp256k1 = { version = "0.7.0", default-features = false } linked-hash-map = { version = "0.5.4" } diff --git a/prdoc/pr_6248.prdoc b/prdoc/pr_6248.prdoc new file mode 100644 index 000000000000..71fb0891cac6 --- /dev/null +++ b/prdoc/pr_6248.prdoc @@ -0,0 +1,16 @@ +title: Upgrade libp2p to 0.54.1 + +doc: + - audience: [Node Dev, Node Operator] + description: | + Upgrade libp2p from 0.52.4 to 0.54.1 + +crates: + - name: sc-network + bump: major + - name: sc-network-types + bump: minor + - name: sc-network-sync + bump: patch + - name: sc-telemetry + bump: minor diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index cee80b6c1e86..e2a91e961668 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -68,6 +68,7 @@ pub struct Behaviour { } /// Event generated by `Behaviour`. +#[derive(Debug)] pub enum BehaviourOut { /// Started a random iterative Kademlia discovery query. RandomKademliaStarted, diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 81baa00e201e..917449cf228c 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -53,13 +53,13 @@ use futures::prelude::*; use futures_timer::Delay; use ip_network::IpNetwork; use libp2p::{ - core::{Endpoint, Multiaddr}, + core::{transport::PortUse, Endpoint, Multiaddr}, kad::{ self, - record::store::{MemoryStore, RecordStore}, + store::{MemoryStore, RecordStore}, Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent, - GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, QueryId, - QueryResult, Quorum, Record, RecordKey, + Event, GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, + QueryId, QueryResult, Quorum, Record, RecordKey, }, mdns::{self, tokio::Behaviour as TokioMdns}, multiaddr::Protocol, @@ -68,8 +68,8 @@ use libp2p::{ toggle::{Toggle, ToggleConnectionHandler}, DialFailure, ExternalAddrConfirmed, FromSwarm, }, - ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters, - StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }, PeerId, }; @@ -214,23 +214,14 @@ impl DiscoveryConfig { enable_mdns, kademlia_disjoint_query_paths, kademlia_protocol, - kademlia_legacy_protocol, + kademlia_legacy_protocol: _, kademlia_replication_factor, } = self; let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol { - let mut config = KademliaConfig::default(); + let mut config = KademliaConfig::new(kademlia_protocol.clone()); config.set_replication_factor(kademlia_replication_factor); - // Populate kad with both the legacy and the new protocol names. - // Remove the legacy protocol: - // https://github.com/paritytech/polkadot-sdk/issues/504 - let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol { - vec![kademlia_protocol.clone(), legacy_protocol] - } else { - vec![kademlia_protocol.clone()] - }; - config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect()); config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth); @@ -647,12 +638,14 @@ impl NetworkBehaviour for DiscoveryBehaviour { peer: PeerId, addr: &Multiaddr, role_override: Endpoint, + port_use: PortUse, ) -> Result, ConnectionDenied> { self.kademlia.handle_established_outbound_connection( connection_id, peer, addr, role_override, + port_use, ) } @@ -724,7 +717,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { Ok(list.into_iter().collect()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(e) => { self.num_connections += 1; @@ -811,6 +804,10 @@ impl NetworkBehaviour for DiscoveryBehaviour { self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); }, + event => { + debug!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}"); + self.kademlia.on_swarm_event(event); + }, } } @@ -823,11 +820,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { self.kademlia.on_connection_handler_event(peer_id, connection_id, event); } - fn poll( - &mut self, - cx: &mut Context, - params: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { // Immediately process the content of `discovered`. if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ToSwarm::GenerateEvent(ev)) @@ -870,7 +863,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } - while let Poll::Ready(ev) = self.kademlia.poll(cx, params) { + while let Poll::Ready(ev) = self.kademlia.poll(cx) { match ev { ToSwarm::GenerateEvent(ev) => match ev { KademliaEvent::RoutingUpdated { peer, .. } => { @@ -1103,30 +1096,38 @@ impl NetworkBehaviour for DiscoveryBehaviour { e.key(), e, ), }, + KademliaEvent::OutboundQueryProgressed { + result: QueryResult::Bootstrap(res), + .. + } => match res { + Ok(ok) => debug!( + target: "sub-libp2p", + "Libp2p => DHT bootstrap progressed: {ok:?}", + ), + Err(e) => warn!( + target: "sub-libp2p", + "Libp2p => DHT bootstrap error: {e:?}", + ), + }, // We never start any other type of query. KademliaEvent::OutboundQueryProgressed { result: e, .. } => { warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e) }, + Event::ModeChanged { new_mode } => { + debug!(target: "sub-libp2p", "Libp2p => Kademlia mode changed: {new_mode}") + }, }, ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }), - ToSwarm::NotifyHandler { peer_id, handler, event } => - return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), - ToSwarm::CloseConnection { peer_id, connection } => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - ToSwarm::NewExternalAddrCandidate(observed) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - ToSwarm::ExternalAddrConfirmed(addr) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - ToSwarm::ExternalAddrExpired(addr) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), - ToSwarm::RemoveListener { id } => - return Poll::Ready(ToSwarm::RemoveListener { id }), + event => { + return Poll::Ready(event.map_out(|_| { + unreachable!("`GenerateEvent` is handled in a branch above; qed") + })); + }, } } // Poll mDNS. - while let Poll::Ready(ev) = self.mdns.poll(cx, params) { + while let Poll::Ready(ev) = self.mdns.poll(cx) { match ev { ToSwarm::GenerateEvent(event) => match event { mdns::Event::Discovered(list) => { @@ -1148,17 +1149,17 @@ impl NetworkBehaviour for DiscoveryBehaviour { }, // `event` is an enum with no variant ToSwarm::NotifyHandler { event, .. } => match event {}, - ToSwarm::CloseConnection { peer_id, connection } => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - ToSwarm::NewExternalAddrCandidate(observed) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - ToSwarm::ExternalAddrConfirmed(addr) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - ToSwarm::ExternalAddrExpired(addr) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), - ToSwarm::RemoveListener { id } => - return Poll::Ready(ToSwarm::RemoveListener { id }), + event => { + return Poll::Ready( + event + .map_in(|_| { + unreachable!("`NotifyHandler` is handled in a branch above; qed") + }) + .map_out(|_| { + unreachable!("`GenerateEvent` is handled in a branch above; qed") + }), + ); + }, } } @@ -1201,21 +1202,14 @@ mod tests { }, identity::Keypair, noise, - swarm::{Executor, Swarm, SwarmEvent}, + swarm::{Swarm, SwarmEvent}, yamux, Multiaddr, }; use sp_core::hash::H256; - use std::{collections::HashSet, pin::Pin, task::Poll}; + use std::{collections::HashSet, task::Poll, time::Duration}; - struct TokioExecutor(tokio::runtime::Runtime); - impl Executor for TokioExecutor { - fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); - } - } - - #[test] - fn discovery_working() { + #[tokio::test] + async fn discovery_working() { let mut first_swarm_peer_id_and_addr = None; let genesis_hash = H256::from_low_u64_be(1); @@ -1226,42 +1220,40 @@ mod tests { // the first swarm via `with_permanent_addresses`. let mut swarms = (0..25) .map(|i| { - let keypair = Keypair::generate_ed25519(); - - let transport = MemoryTransport::new() - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&keypair).unwrap()) - .multiplex(yamux::Config::default()) - .boxed(); - - let behaviour = { - let mut config = DiscoveryConfig::new(keypair.public().to_peer_id()); - config - .with_permanent_addresses(first_swarm_peer_id_and_addr.clone()) - .allow_private_ip(true) - .allow_non_globals_in_dht(true) - .discovery_limit(50) - .with_kademlia(genesis_hash, fork_id, &protocol_id); - - config.finish() - }; - - let runtime = tokio::runtime::Runtime::new().unwrap(); - #[allow(deprecated)] - let mut swarm = libp2p::swarm::SwarmBuilder::with_executor( - transport, - behaviour, - keypair.public().to_peer_id(), - TokioExecutor(runtime), - ) - .build(); + let mut swarm = libp2p::SwarmBuilder::with_new_identity() + .with_tokio() + .with_other_transport(|keypair| { + MemoryTransport::new() + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&keypair).unwrap()) + .multiplex(yamux::Config::default()) + .boxed() + }) + .unwrap() + .with_behaviour(|keypair| { + let mut config = DiscoveryConfig::new(keypair.public().to_peer_id()); + config + .with_permanent_addresses(first_swarm_peer_id_and_addr.clone()) + .allow_private_ip(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .with_kademlia(genesis_hash, fork_id, &protocol_id); + + config.finish() + }) + .unwrap() + .with_swarm_config(|config| { + // This is taken care of by notification protocols in non-test environment + config.with_idle_connection_timeout(Duration::from_secs(10)) + }) + .build(); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); if i == 0 { first_swarm_peer_id_and_addr = - Some((keypair.public().to_peer_id(), listen_addr.clone())) + Some((*swarm.local_peer_id(), listen_addr.clone())) } swarm.listen_on(listen_addr.clone()).unwrap(); @@ -1348,7 +1340,7 @@ mod tests { } }); - futures::executor::block_on(fut); + fut.await } #[test] diff --git a/substrate/client/network/src/network_state.rs b/substrate/client/network/src/network_state.rs index cf8b8b55a7ff..65fd494739ee 100644 --- a/substrate/client/network/src/network_state.rs +++ b/substrate/client/network/src/network_state.rs @@ -106,7 +106,7 @@ pub enum Endpoint { impl From for PeerEndpoint { fn from(endpoint: ConnectedPoint) -> Self { match endpoint { - ConnectedPoint::Dialer { address, role_override } => + ConnectedPoint::Dialer { address, role_override, port_use: _ } => Self::Dialing(address, role_override.into()), ConnectedPoint::Listener { local_addr, send_back_addr } => Self::Listening { local_addr, send_back_addr }, diff --git a/substrate/client/network/src/peer_info.rs b/substrate/client/network/src/peer_info.rs index 21eeea6bcc0c..a673f06fd622 100644 --- a/substrate/client/network/src/peer_info.rs +++ b/substrate/client/network/src/peer_info.rs @@ -25,7 +25,7 @@ use either::Either; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::{ - core::{ConnectedPoint, Endpoint}, + core::{transport::PortUse, ConnectedPoint, Endpoint}, identify::{ Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent, Info as IdentifyInfo, @@ -38,8 +38,8 @@ use libp2p::{ ExternalAddrConfirmed, FromSwarm, ListenFailure, }, ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId, - NetworkBehaviour, NewExternalAddrCandidate, PollParameters, THandler, THandlerInEvent, - THandlerOutEvent, ToSwarm, + NetworkBehaviour, NewExternalAddrCandidate, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, }, Multiaddr, PeerId, }; @@ -275,23 +275,26 @@ impl NetworkBehaviour for PeerInfoBehaviour { peer: PeerId, addr: &Multiaddr, role_override: Endpoint, + port_use: PortUse, ) -> Result, ConnectionDenied> { let ping_handler = self.ping.handle_established_outbound_connection( connection_id, peer, addr, role_override, + port_use, )?; let identify_handler = self.identify.handle_established_outbound_connection( connection_id, peer, addr, role_override, + port_use, )?; Ok(ping_handler.select(identify_handler)) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished( e @ ConnectionEstablished { peer_id, endpoint, .. }, @@ -319,22 +322,21 @@ impl NetworkBehaviour for PeerInfoBehaviour { peer_id, connection_id, endpoint, - handler, + cause, remaining_established, }) => { - let (ping_handler, identity_handler) = handler.into_inner(); self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, endpoint, - handler: ping_handler, + cause, remaining_established, })); self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, endpoint, - handler: identity_handler, + cause, remaining_established, })); @@ -369,18 +371,21 @@ impl NetworkBehaviour for PeerInfoBehaviour { send_back_addr, error, connection_id, + peer_id, }) => { self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, error, connection_id, + peer_id, })); self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure { local_addr, send_back_addr, error, connection_id, + peer_id, })); }, FromSwarm::ListenerError(e) => { @@ -438,6 +443,11 @@ impl NetworkBehaviour for PeerInfoBehaviour { self.ping.on_swarm_event(FromSwarm::NewListenAddr(e)); self.identify.on_swarm_event(FromSwarm::NewListenAddr(e)); }, + event => { + debug!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}"); + self.ping.on_swarm_event(event); + self.identify.on_swarm_event(event); + }, } } @@ -455,47 +465,29 @@ impl NetworkBehaviour for PeerInfoBehaviour { } } - fn poll( - &mut self, - cx: &mut Context, - params: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { if let Some(event) = self.pending_actions.pop_front() { return Poll::Ready(event) } loop { - match self.ping.poll(cx, params) { + match self.ping.poll(cx) { Poll::Pending => break, Poll::Ready(ToSwarm::GenerateEvent(ev)) => { if let PingEvent { peer, result: Ok(rtt), connection } = ev { self.handle_ping_report(&peer, rtt, connection) } }, - Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), - Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id, - handler, - event: Either::Left(event), - }), - Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - Poll::Ready(ToSwarm::ListenOn { opts }) => - return Poll::Ready(ToSwarm::ListenOn { opts }), - Poll::Ready(ToSwarm::RemoveListener { id }) => - return Poll::Ready(ToSwarm::RemoveListener { id }), + Poll::Ready(event) => { + return Poll::Ready(event.map_in(Either::Left).map_out(|_| { + unreachable!("`GenerateEvent` is handled in a branch above; qed") + })); + }, } } loop { - match self.identify.poll(cx, params) { + match self.identify.poll(cx) { Poll::Pending => break, Poll::Ready(ToSwarm::GenerateEvent(event)) => match event { IdentifyEvent::Received { peer_id, info, .. } => { @@ -503,31 +495,20 @@ impl NetworkBehaviour for PeerInfoBehaviour { let event = PeerInfoEvent::Identified { peer_id, info }; return Poll::Ready(ToSwarm::GenerateEvent(event)) }, - IdentifyEvent::Error { peer_id, error } => { - debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error) + IdentifyEvent::Error { connection_id, peer_id, error } => { + debug!( + target: "sub-libp2p", + "Identification with peer {peer_id:?}({connection_id}) failed => {error}" + ); }, IdentifyEvent::Pushed { .. } => {}, IdentifyEvent::Sent { .. } => {}, }, - Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), - Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id, - handler, - event: Either::Right(event), - }), - Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - Poll::Ready(ToSwarm::ListenOn { opts }) => - return Poll::Ready(ToSwarm::ListenOn { opts }), - Poll::Ready(ToSwarm::RemoveListener { id }) => - return Poll::Ready(ToSwarm::RemoveListener { id }), + Poll::Ready(event) => { + return Poll::Ready(event.map_in(Either::Right).map_out(|_| { + unreachable!("`GenerateEvent` is handled in a branch above; qed") + })); + }, } } diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 402baa7bb2a4..6da1d601b34f 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -27,10 +27,10 @@ use crate::{ use codec::Encode; use libp2p::{ - core::Endpoint, + core::{transport::PortUse, Endpoint}, swarm::{ - behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, - THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, }; @@ -47,9 +47,7 @@ use notifications::{Notifications, NotificationsOut}; pub(crate) use notifications::ProtocolHandle; -pub use notifications::{ - notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready, -}; +pub use notifications::{notification_service, NotificationsSink, ProtocolHandlePair, Ready}; mod notifications; @@ -250,12 +248,14 @@ impl NetworkBehaviour for Protocol { peer: PeerId, addr: &Multiaddr, role_override: Endpoint, + port_use: PortUse, ) -> Result, ConnectionDenied> { self.behaviour.handle_established_outbound_connection( connection_id, peer, addr, role_override, + port_use, ) } @@ -271,7 +271,7 @@ impl NetworkBehaviour for Protocol { Ok(Vec::new()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.behaviour.on_swarm_event(event); } @@ -287,26 +287,15 @@ impl NetworkBehaviour for Protocol { fn poll( &mut self, cx: &mut std::task::Context, - params: &mut impl PollParameters, ) -> Poll>> { - let event = match self.behaviour.poll(cx, params) { + let event = match self.behaviour.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev, - Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), - Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => - return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), - Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - Poll::Ready(ToSwarm::ListenOn { opts }) => - return Poll::Ready(ToSwarm::ListenOn { opts }), - Poll::Ready(ToSwarm::RemoveListener { id }) => - return Poll::Ready(ToSwarm::RemoveListener { id }), + Poll::Ready(event) => { + return Poll::Ready(event.map_out(|_| { + unreachable!("`GenerateEvent` is handled in a branch above; qed") + })); + }, }; let outcome = match event { diff --git a/substrate/client/network/src/protocol/notifications.rs b/substrate/client/network/src/protocol/notifications.rs index 10fa329097d1..2691496234ad 100644 --- a/substrate/client/network/src/protocol/notifications.rs +++ b/substrate/client/network/src/protocol/notifications.rs @@ -21,7 +21,7 @@ pub use self::{ behaviour::{Notifications, NotificationsOut, ProtocolConfig}, - handler::{NotificationsSink, NotifsHandlerError, Ready}, + handler::{NotificationsSink, Ready}, service::{notification_service, ProtocolHandlePair}, }; diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs index a562546145c8..e6909fcdefea 100644 --- a/substrate/client/network/src/protocol/notifications/behaviour.rs +++ b/substrate/client/network/src/protocol/notifications/behaviour.rs @@ -33,11 +33,11 @@ use bytes::BytesMut; use fnv::FnvHashMap; use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; use libp2p::{ - core::{Endpoint, Multiaddr}, + core::{transport::PortUse, Endpoint, Multiaddr}, swarm::{ behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, - ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, PollParameters, - THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }, PeerId, }; @@ -49,6 +49,7 @@ use smallvec::SmallVec; use tokio::sync::oneshot::error::RecvError; use tokio_stream::StreamMap; +use libp2p::swarm::CloseConnection; use std::{ cmp, collections::{hash_map::Entry, VecDeque}, @@ -1233,11 +1234,12 @@ impl NetworkBehaviour for Notifications { peer: PeerId, _addr: &Multiaddr, _role_override: Endpoint, + _port_use: PortUse, ) -> Result, ConnectionDenied> { Ok(NotifsHandler::new(peer, self.notif_protocols.clone(), Some(self.metrics.clone()))) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, @@ -1670,6 +1672,9 @@ impl NetworkBehaviour for Notifications { FromSwarm::ExternalAddrConfirmed(_) => {}, FromSwarm::AddressChange(_) => {}, FromSwarm::NewListenAddr(_) => {}, + event => { + warn!(target: "sub-libp2p", "New unknown `FromSwarm` libp2p event: {event:?}"); + }, } } @@ -2217,14 +2222,19 @@ impl NetworkBehaviour for Notifications { ); } }, + NotifsHandlerOut::Close { protocol_index } => { + let set_id = SetId::from(protocol_index); + + trace!(target: "sub-libp2p", "Handler({}, {:?}) => SyncNotificationsClogged({:?})", peer_id, connection_id, set_id); + self.events.push_back(ToSwarm::CloseConnection { + peer_id, + connection: CloseConnection::One(connection_id), + }); + }, } } - fn poll( - &mut self, - cx: &mut Context, - _params: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event) } @@ -2359,7 +2369,6 @@ impl NetworkBehaviour for Notifications { } #[cfg(test)] -#[allow(deprecated)] mod tests { use super::*; use crate::{ @@ -2386,17 +2395,6 @@ mod tests { } } - #[derive(Clone)] - struct MockPollParams {} - - impl PollParameters for MockPollParams { - type SupportedProtocolsIter = std::vec::IntoIter>; - - fn supported_protocols(&self) -> Self::SupportedProtocolsIter { - vec![].into_iter() - } - } - fn development_notifs( ) -> (Notifications, ProtocolController, Box) { @@ -2654,7 +2652,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -2854,7 +2852,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3007,7 +3005,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3051,7 +3049,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3121,7 +3119,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3269,7 +3267,7 @@ mod tests { peer_id: peer, connection_id: conn1, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3395,7 +3393,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3469,7 +3467,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3532,7 +3530,7 @@ mod tests { peer_id: peer, connection_id: conn1, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3546,7 +3544,7 @@ mod tests { peer_id: peer, connection_id: conn2, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3600,7 +3598,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3658,7 +3656,7 @@ mod tests { peer_id: peer, connection_id: conn2, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3719,7 +3717,7 @@ mod tests { peer_id: peer, connection_id: conn1, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3788,7 +3786,7 @@ mod tests { peer_id: peer, connection_id: conn1, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3829,7 +3827,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3952,7 +3950,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -3972,11 +3970,9 @@ mod tests { assert!(notif.peers.get(&(peer, set_id)).is_some()); if tokio::time::timeout(Duration::from_secs(5), async { - let mut params = MockPollParams {}; - loop { futures::future::poll_fn(|cx| { - let _ = notif.poll(cx, &mut params); + let _ = notif.poll(cx); Poll::Ready(()) }) .await; @@ -4080,11 +4076,9 @@ mod tests { // verify that the code continues to keep the peer disabled by resetting the timer // after the first one expired. if tokio::time::timeout(Duration::from_secs(5), async { - let mut params = MockPollParams {}; - loop { futures::future::poll_fn(|cx| { - let _ = notif.poll(cx, &mut params); + let _ = notif.poll(cx); Poll::Ready(()) }) .await; @@ -4262,7 +4256,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4503,7 +4497,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4605,7 +4599,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4687,7 +4681,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &endpoint.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4804,7 +4798,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(1337), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4839,7 +4833,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(1337), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4890,7 +4884,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(1337), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4937,7 +4931,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -4987,7 +4981,7 @@ mod tests { peer_id: peer, connection_id: ConnectionId::new_unchecked(1337), endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -5030,7 +5024,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); @@ -5041,7 +5035,7 @@ mod tests { peer_id: peer, connection_id: conn, endpoint: &connected.clone(), - handler: NotifsHandler::new(peer, vec![], None), + cause: None, remaining_established: 0usize, }, )); diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index bff60ba1125f..332de9f19c41 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -74,12 +74,12 @@ use futures::{ }; use libp2p::{ swarm::{ - handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, + handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream, SubstreamProtocol, }, PeerId, }; -use log::error; +use log::{error, warn}; use parking_lot::{Mutex, RwLock}; use std::{ collections::VecDeque, @@ -87,7 +87,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; /// Number of pending notifications in asynchronous contexts. @@ -113,16 +113,18 @@ pub struct NotifsHandler { /// List of notification protocols, specified by the user at initialization. protocols: Vec, - /// When the connection with the remote has been successfully established. - when_connection_open: Instant, + /// Whether to keep connection alive + keep_alive: bool, + + /// Optional future that keeps connection alive for a certain amount of time. + // TODO: this should be safe to remove, see https://github.com/paritytech/polkadot-sdk/issues/6350 + keep_alive_timeout_future: Option + Send + 'static>>>, /// Remote we are connected to. peer_id: PeerId, /// Events to return in priority from `poll`. - events_queue: VecDeque< - ConnectionHandlerEvent, - >, + events_queue: VecDeque>, /// Metrics. metrics: Option>, @@ -149,7 +151,12 @@ impl NotifsHandler { }) .collect(), peer_id, - when_connection_open: Instant::now(), + // Keep connection alive initially until below timeout expires + keep_alive: true, + // A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote + // to express desire to open substreams. + // TODO: This is a hack and ideally should not be necessary + keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))), events_queue: VecDeque::with_capacity(16), metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))), } @@ -327,6 +334,12 @@ pub enum NotifsHandlerOut { /// Message that has been received. message: BytesMut, }, + + /// Close connection + Close { + /// Index of the protocol in the list of protocols passed at initialization. + protocol_index: usize, + }, } /// Sink connected directly to the node background task. Allows sending notifications to the peer. @@ -465,17 +478,9 @@ impl<'a> Ready<'a> { } } -/// Error specific to the collection of protocols. -#[derive(Debug, thiserror::Error)] -pub enum NotifsHandlerError { - #[error("Channel of synchronous notifications is full.")] - SyncNotificationsClogged, -} - impl ConnectionHandler for NotifsHandler { type FromBehaviour = NotifsHandlerIn; type ToBehaviour = NotifsHandlerOut; - type Error = NotifsHandlerError; type InboundProtocol = UpgradeCollec; type OutboundProtocol = NotificationsOut; // Index within the `out_protocols`. @@ -616,6 +621,9 @@ impl ConnectionHandler for NotifsHandler { State::Open { .. } => debug_assert!(false), }, ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {}, + event => { + warn!(target: "sub-libp2p", "New unknown `ConnectionEvent` libp2p event: {event:?}"); + }, } } @@ -711,35 +719,36 @@ impl ConnectionHandler for NotifsHandler { } } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { // `Yes` if any protocol has some activity. if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) { - return KeepAlive::Yes + return true; } - // A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote - // to express desire to open substreams. - #[allow(deprecated)] - KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME) + self.keep_alive } - #[allow(deprecated)] fn poll( &mut self, cx: &mut Context, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { + { + let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future; + if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future { + if keep_alive_timeout_future.poll_unpin(cx).is_ready() { + maybe_keep_alive_timeout_future.take(); + self.keep_alive = false; + } + } + } + if let Some(ev) = self.events_queue.pop_front() { return Poll::Ready(ev) } - // For each open substream, try send messages from `notifications_sink_rx` to the + // For each open substream, try to send messages from `notifications_sink_rx` to the // substream. for protocol_index in 0..self.protocols.len() { if let State::Open { @@ -750,11 +759,10 @@ impl ConnectionHandler for NotifsHandler { // Only proceed with `out_substream.poll_ready_unpin` if there is an element // available in `notifications_sink_rx`. This avoids waking up the task when // a substream is ready to send if there isn't actually something to send. - #[allow(deprecated)] match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) { Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => - return Poll::Ready(ConnectionHandlerEvent::Close( - NotifsHandlerError::SyncNotificationsClogged, + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + NotifsHandlerOut::Close { protocol_index }, )), Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {}, Poll::Ready(None) | Poll::Pending => break, @@ -975,6 +983,17 @@ pub mod tests { rx_buffer: BytesMut, } + /// Mirror of `ActiveStreamCounter` in `libp2p` + #[allow(dead_code)] + struct MockActiveStreamCounter(Arc<()>); + + // Mirror of `Stream` in `libp2p` + #[allow(dead_code)] + struct MockStream { + stream: Negotiated, + counter: Option, + } + impl MockSubstream { /// Create new substream pair. pub fn new() -> (Self, Self) { @@ -1004,16 +1023,11 @@ pub mod tests { /// Unsafe substitute for `Stream::new` private constructor. fn stream_new(stream: Negotiated) -> Stream { + let stream = MockStream { stream, counter: None }; // Static asserts to make sure this doesn't break. const _: () = { - assert!( - core::mem::size_of::() == - core::mem::size_of::>() - ); - assert!( - core::mem::align_of::() == - core::mem::align_of::>() - ); + assert!(core::mem::size_of::() == core::mem::size_of::()); + assert!(core::mem::align_of::() == core::mem::align_of::()); }; unsafe { core::mem::transmute(stream) } @@ -1084,24 +1098,16 @@ pub mod tests { /// Create new [`NotifsHandler`]. fn notifs_handler() -> NotifsHandler { - let proto = Protocol { - config: ProtocolConfig { + NotifsHandler::new( + PeerId::random(), + vec![ProtocolConfig { name: "/foo".into(), fallback_names: vec![], handshake: Arc::new(RwLock::new(b"hello, world".to_vec())), max_notification_size: u64::MAX, - }, - in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX), - state: State::Closed { pending_opening: false }, - }; - - NotifsHandler { - protocols: vec![proto], - when_connection_open: Instant::now(), - peer_id: PeerId::random(), - events_queue: VecDeque::new(), - metrics: None, - } + }], + None, + ) } // verify that if another substream is attempted to be opened by remote while an inbound @@ -1608,12 +1614,11 @@ pub mod tests { notifications_sink.send_sync_notification(vec![1, 3, 3, 9]); notifications_sink.send_sync_notification(vec![1, 3, 4, 0]); - #[allow(deprecated)] futures::future::poll_fn(|cx| { assert!(std::matches!( handler.poll(cx), - Poll::Ready(ConnectionHandlerEvent::Close( - NotifsHandlerError::SyncNotificationsClogged, + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + NotifsHandlerOut::Close { .. } )) )); Poll::Ready(()) diff --git a/substrate/client/network/src/protocol/notifications/tests.rs b/substrate/client/network/src/protocol/notifications/tests.rs index a8eeb2bb1980..50f03b5911b6 100644 --- a/substrate/client/network/src/protocol/notifications/tests.rs +++ b/substrate/client/network/src/protocol/notifications/tests.rs @@ -30,30 +30,25 @@ use crate::{ use futures::{future::BoxFuture, prelude::*}; use libp2p::{ - core::{transport::MemoryTransport, upgrade, Endpoint}, + core::{ + transport::{MemoryTransport, PortUse}, + upgrade, Endpoint, + }, identity, noise, swarm::{ - self, behaviour::FromSwarm, ConnectionDenied, ConnectionId, Executor, NetworkBehaviour, - PollParameters, Swarm, SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, Swarm, SwarmEvent, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, - yamux, Multiaddr, PeerId, Transport, + yamux, Multiaddr, PeerId, SwarmBuilder, Transport, }; use sc_utils::mpsc::tracing_unbounded; use std::{ iter, - pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration, }; -struct TokioExecutor(tokio::runtime::Runtime); -impl Executor for TokioExecutor { - fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); - } -} - /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. fn build_nodes() -> (Swarm, Swarm) { @@ -67,13 +62,6 @@ fn build_nodes() -> (Swarm, Swarm) { for index in 0..2 { let keypair = keypairs[index].clone(); - let transport = MemoryTransport::new() - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&keypair).unwrap()) - .multiplex(yamux::Config::default()) - .timeout(Duration::from_secs(20)) - .boxed(); - let (protocol_handle_pair, mut notif_service) = crate::protocol::notifications::service::notification_service("/foo".into()); // The first swarm has the second peer ID present in the peerstore. @@ -102,39 +90,8 @@ fn build_nodes() -> (Swarm, Swarm) { ); let (notif_handle, command_stream) = protocol_handle_pair.split(); - let behaviour = CustomProtoWithAddr { - inner: Notifications::new( - vec![controller_handle], - from_controller, - NotificationMetrics::new(None), - iter::once(( - ProtocolConfig { - name: "/foo".into(), - fallback_names: Vec::new(), - handshake: Vec::new(), - max_notification_size: 1024 * 1024, - }, - notif_handle, - command_stream, - )), - ), - peer_store_future: peer_store.run().boxed(), - protocol_controller_future: controller.run().boxed(), - addrs: addrs - .iter() - .enumerate() - .filter_map(|(n, a)| { - if n != index { - Some((keypairs[n].public().to_peer_id(), a.clone())) - } else { - None - } - }) - .collect(), - }; - let runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.spawn(async move { + tokio::spawn(async move { loop { if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } = notif_service.next_event().await.unwrap() @@ -144,12 +101,49 @@ fn build_nodes() -> (Swarm, Swarm) { } }); - let mut swarm = Swarm::new( - transport, - behaviour, - keypairs[index].public().to_peer_id(), - swarm::Config::with_executor(TokioExecutor(runtime)), - ); + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_other_transport(|keypair| { + MemoryTransport::new() + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&keypair).unwrap()) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed() + }) + .unwrap() + .with_behaviour(|_keypair| CustomProtoWithAddr { + inner: Notifications::new( + vec![controller_handle], + from_controller, + NotificationMetrics::new(None), + iter::once(( + ProtocolConfig { + name: "/foo".into(), + fallback_names: Vec::new(), + handshake: Vec::new(), + max_notification_size: 1024 * 1024, + }, + notif_handle, + command_stream, + )), + ), + peer_store_future: peer_store.run().boxed(), + protocol_controller_future: controller.run().boxed(), + addrs: addrs + .iter() + .enumerate() + .filter_map(|(n, a)| { + if n != index { + Some((keypairs[n].public().to_peer_id(), a.clone())) + } else { + None + } + }) + .collect(), + }) + .unwrap() + .build(); swarm.listen_on(addrs[index].clone()).unwrap(); out.push(swarm); } @@ -241,12 +235,18 @@ impl NetworkBehaviour for CustomProtoWithAddr { peer: PeerId, addr: &Multiaddr, role_override: Endpoint, + port_use: PortUse, ) -> Result, ConnectionDenied> { - self.inner - .handle_established_outbound_connection(connection_id, peer, addr, role_override) + self.inner.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.inner.on_swarm_event(event); } @@ -259,19 +259,15 @@ impl NetworkBehaviour for CustomProtoWithAddr { self.inner.on_connection_handler_event(peer_id, connection_id, event); } - fn poll( - &mut self, - cx: &mut Context, - params: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { let _ = self.peer_store_future.poll_unpin(cx); let _ = self.protocol_controller_future.poll_unpin(cx); - self.inner.poll(cx, params) + self.inner.poll(cx) } } -#[test] -fn reconnect_after_disconnect() { +#[tokio::test] +async fn reconnect_after_disconnect() { // We connect two nodes together, then force a disconnect (through the API of the `Service`), // check that the disconnect worked, and finally check whether they successfully reconnect. @@ -288,108 +284,106 @@ fn reconnect_after_disconnect() { let mut service1_state = ServiceState::NotConnected; let mut service2_state = ServiceState::NotConnected; - futures::executor::block_on(async move { - loop { - // Grab next event from services. - let event = { - let s1 = service1.select_next_some(); - let s2 = service2.select_next_some(); - futures::pin_mut!(s1, s2); - match future::select(s1, s2).await { - future::Either::Left((ev, _)) => future::Either::Left(ev), - future::Either::Right((ev, _)) => future::Either::Right(ev), - } - }; - - match event { - future::Either::Left(SwarmEvent::Behaviour( - NotificationsOut::CustomProtocolOpen { .. }, - )) => match service1_state { - ServiceState::NotConnected => { - service1_state = ServiceState::FirstConnec; - if service2_state == ServiceState::FirstConnec { - service1 - .behaviour_mut() - .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); - } - }, - ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, - ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), - }, - future::Either::Left(SwarmEvent::Behaviour( - NotificationsOut::CustomProtocolClosed { .. }, - )) => match service1_state { - ServiceState::FirstConnec => service1_state = ServiceState::Disconnected, - ServiceState::ConnectedAgain | - ServiceState::NotConnected | - ServiceState::Disconnected => panic!(), - }, - future::Either::Right(SwarmEvent::Behaviour( - NotificationsOut::CustomProtocolOpen { .. }, - )) => match service2_state { - ServiceState::NotConnected => { - service2_state = ServiceState::FirstConnec; - if service1_state == ServiceState::FirstConnec { - service1 - .behaviour_mut() - .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); - } - }, - ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, - ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), - }, - future::Either::Right(SwarmEvent::Behaviour( - NotificationsOut::CustomProtocolClosed { .. }, - )) => match service2_state { - ServiceState::FirstConnec => service2_state = ServiceState::Disconnected, - ServiceState::ConnectedAgain | - ServiceState::NotConnected | - ServiceState::Disconnected => panic!(), - }, - _ => {}, + loop { + // Grab next event from services. + let event = { + let s1 = service1.select_next_some(); + let s2 = service2.select_next_some(); + futures::pin_mut!(s1, s2); + match future::select(s1, s2).await { + future::Either::Left((ev, _)) => future::Either::Left(ev), + future::Either::Right((ev, _)) => future::Either::Right(ev), } + }; - // Due to the bug in `Notifications`, the disconnected node does not always detect that - // it was disconnected. The closed inbound substream is tolerated by design, and the - // closed outbound substream is not detected until something is sent into it. - // See [PR #13396](https://github.com/paritytech/substrate/pull/13396). - // This happens if the disconnecting node reconnects to it fast enough. - // In this case the disconnected node does not transit via `ServiceState::NotConnected` - // and stays in `ServiceState::FirstConnec`. - // TODO: update this once the fix is finally merged. - if service1_state == ServiceState::ConnectedAgain && - service2_state == ServiceState::ConnectedAgain || - service1_state == ServiceState::ConnectedAgain && - service2_state == ServiceState::FirstConnec || - service1_state == ServiceState::FirstConnec && - service2_state == ServiceState::ConnectedAgain - { - break - } + match event { + future::Either::Left(SwarmEvent::Behaviour(NotificationsOut::CustomProtocolOpen { + .. + })) => match service1_state { + ServiceState::NotConnected => { + service1_state = ServiceState::FirstConnec; + if service2_state == ServiceState::FirstConnec { + service1 + .behaviour_mut() + .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); + } + }, + ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, + ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), + }, + future::Either::Left(SwarmEvent::Behaviour( + NotificationsOut::CustomProtocolClosed { .. }, + )) => match service1_state { + ServiceState::FirstConnec => service1_state = ServiceState::Disconnected, + ServiceState::ConnectedAgain | + ServiceState::NotConnected | + ServiceState::Disconnected => panic!(), + }, + future::Either::Right(SwarmEvent::Behaviour( + NotificationsOut::CustomProtocolOpen { .. }, + )) => match service2_state { + ServiceState::NotConnected => { + service2_state = ServiceState::FirstConnec; + if service1_state == ServiceState::FirstConnec { + service1 + .behaviour_mut() + .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); + } + }, + ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, + ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), + }, + future::Either::Right(SwarmEvent::Behaviour( + NotificationsOut::CustomProtocolClosed { .. }, + )) => match service2_state { + ServiceState::FirstConnec => service2_state = ServiceState::Disconnected, + ServiceState::ConnectedAgain | + ServiceState::NotConnected | + ServiceState::Disconnected => panic!(), + }, + _ => {}, } - // Now that the two services have disconnected and reconnected, wait for 3 seconds and - // check whether they're still connected. - let mut delay = futures_timer::Delay::new(Duration::from_secs(3)); - - loop { - // Grab next event from services. - let event = { - let s1 = service1.select_next_some(); - let s2 = service2.select_next_some(); - futures::pin_mut!(s1, s2); - match future::select(future::select(s1, s2), &mut delay).await { - future::Either::Right(_) => break, // success - future::Either::Left((future::Either::Left((ev, _)), _)) => ev, - future::Either::Left((future::Either::Right((ev, _)), _)) => ev, - } - }; + // Due to the bug in `Notifications`, the disconnected node does not always detect that + // it was disconnected. The closed inbound substream is tolerated by design, and the + // closed outbound substream is not detected until something is sent into it. + // See [PR #13396](https://github.com/paritytech/substrate/pull/13396). + // This happens if the disconnecting node reconnects to it fast enough. + // In this case the disconnected node does not transit via `ServiceState::NotConnected` + // and stays in `ServiceState::FirstConnec`. + // TODO: update this once the fix is finally merged. + if service1_state == ServiceState::ConnectedAgain && + service2_state == ServiceState::ConnectedAgain || + service1_state == ServiceState::ConnectedAgain && + service2_state == ServiceState::FirstConnec || + service1_state == ServiceState::FirstConnec && + service2_state == ServiceState::ConnectedAgain + { + break + } + } - match event { - SwarmEvent::Behaviour(NotificationsOut::CustomProtocolOpen { .. }) | - SwarmEvent::Behaviour(NotificationsOut::CustomProtocolClosed { .. }) => panic!(), - _ => {}, + // Now that the two services have disconnected and reconnected, wait for 3 seconds and + // check whether they're still connected. + let mut delay = futures_timer::Delay::new(Duration::from_secs(3)); + + loop { + // Grab next event from services. + let event = { + let s1 = service1.select_next_some(); + let s2 = service2.select_next_some(); + futures::pin_mut!(s1, s2); + match future::select(future::select(s1, s2), &mut delay).await { + future::Either::Right(_) => break, // success + future::Either::Left((future::Either::Left((ev, _)), _)) => ev, + future::Either::Left((future::Either::Right((ev, _)), _)) => ev, } + }; + + match event { + SwarmEvent::Behaviour(NotificationsOut::CustomProtocolOpen { .. }) | + SwarmEvent::Behaviour(NotificationsOut::CustomProtocolClosed { .. }) => panic!(), + _ => {}, } - }); + } } diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs index e01bcbe0bad7..9e8a03fc07c9 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -39,12 +39,12 @@ use crate::types::ProtocolName; use asynchronous_codec::Framed; use bytes::BytesMut; use futures::prelude::*; -use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use log::{error, warn}; use unsigned_varint::codec::UviBytes; use std::{ - io, mem, + fmt, io, mem, pin::Pin, task::{Context, Poll}, vec, @@ -187,6 +187,14 @@ pub struct NotificationsInOpen { pub substream: NotificationsInSubstream, } +impl fmt::Debug for NotificationsInOpen { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NotificationsInOpen") + .field("handshake", &self.handshake) + .finish_non_exhaustive() + } +} + impl NotificationsInSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, @@ -370,7 +378,14 @@ where fn upgrade_outbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future { Box::pin(async move { - upgrade::write_length_prefixed(&mut socket, &self.initial_message).await?; + { + let mut len_data = unsigned_varint::encode::usize_buffer(); + let encoded_len = + unsigned_varint::encode::usize(self.initial_message.len(), &mut len_data).len(); + socket.write_all(&len_data[..encoded_len]).await?; + } + socket.write_all(&self.initial_message).await?; + socket.flush().await?; // Reading handshake. let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?; @@ -413,6 +428,15 @@ pub struct NotificationsOutOpen { pub substream: NotificationsOutSubstream, } +impl fmt::Debug for NotificationsOutOpen { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NotificationsOutOpen") + .field("handshake", &self.handshake) + .field("negotiated_fallback", &self.negotiated_fallback) + .finish_non_exhaustive() + } +} + impl Sink> for NotificationsOutSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6c2631924df4..5fe34c781378 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -43,13 +43,11 @@ use crate::{ use futures::{channel::oneshot, prelude::*}; use libp2p::{ - core::{Endpoint, Multiaddr}, + core::{transport::PortUse, Endpoint, Multiaddr}, request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel}, swarm::{ - behaviour::{ConnectionClosed, FromSwarm}, - handler::multi::MultiHandler, - ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId, + NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, PeerId, }; @@ -64,11 +62,11 @@ use std::{ time::{Duration, Instant}, }; -pub use libp2p::request_response::{Config, RequestId}; +pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId}; /// Possible failures occurring in the context of sending an outbound request and receiving the /// response. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Clone, thiserror::Error)] pub enum OutboundFailure { /// The request could not be sent because a dialing attempt failed. #[error("Failed to dial the requested peer")] @@ -82,6 +80,9 @@ pub enum OutboundFailure { /// The remote supports none of the requested protocols. #[error("The remote supports none of the requested protocols")] UnsupportedProtocols, + /// An IO failure happened on an outbound stream. + #[error("An IO failure happened on an outbound stream")] + Io(Arc), } impl From for OutboundFailure { @@ -93,6 +94,7 @@ impl From for OutboundFailure { OutboundFailure::ConnectionClosed, request_response::OutboundFailure::UnsupportedProtocols => OutboundFailure::UnsupportedProtocols, + request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)), } } } @@ -114,6 +116,9 @@ pub enum InboundFailure { /// The local peer failed to respond to an inbound request #[error("The response channel was dropped without sending a response to the remote")] ResponseOmission, + /// An IO failure happened on an inbound stream. + #[error("An IO failure happened on an inbound stream")] + Io(Arc), } impl From for InboundFailure { @@ -124,6 +129,7 @@ impl From for InboundFailure { request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed, request_response::InboundFailure::UnsupportedProtocols => InboundFailure::UnsupportedProtocols, + request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)), } } } @@ -319,12 +325,12 @@ pub enum Event { /// requests. There is no uniqueness guarantee in a set of both inbound and outbound /// [`ProtocolRequestId`]s. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct ProtocolRequestId { +struct ProtocolRequestId { protocol: ProtocolName, request_id: RequestId, } -impl From<(ProtocolName, RequestId)> for ProtocolRequestId { +impl From<(ProtocolName, RequestId)> for ProtocolRequestId { fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self { Self { protocol, request_id } } @@ -342,7 +348,7 @@ pub struct RequestResponsesBehaviour { >, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. - pending_requests: HashMap, + pending_requests: HashMap, PendingRequest>, /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the /// start time and the response to send back to the remote. @@ -351,11 +357,11 @@ pub struct RequestResponsesBehaviour { >, /// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here. - pending_responses_arrival_time: HashMap, + pending_responses_arrival_time: HashMap, Instant>, /// Whenever a response is received on `pending_responses`, insert a channel to be notified /// when the request has been sent out. - send_feedback: HashMap>, + send_feedback: HashMap, oneshot::Sender<()>>, /// Primarily used to get a reputation of a node. peer_store: Arc, @@ -364,7 +370,7 @@ pub struct RequestResponsesBehaviour { /// Generated by the response builder and waiting to be processed. struct RequestProcessingOutcome { peer: PeerId, - request_id: RequestId, + request_id: InboundRequestId, protocol: ProtocolName, inner_channel: ResponseChannel, ()>>, response: OutgoingResponse, @@ -379,8 +385,7 @@ impl RequestResponsesBehaviour { ) -> Result { let mut protocols = HashMap::new(); for protocol in list { - let mut cfg = Config::default(); - cfg.set_request_timeout(protocol.request_timeout); + let cfg = Config::default().with_request_timeout(protocol.request_timeout); let protocol_support = if protocol.inbound_queue.is_some() { ProtocolSupport::Full @@ -455,7 +460,7 @@ impl RequestResponsesBehaviour { fn send_request_inner( behaviour: &mut Behaviour, - pending_requests: &mut HashMap, + pending_requests: &mut HashMap, PendingRequest>, target: &PeerId, protocol_name: ProtocolName, request: Vec, @@ -541,11 +546,16 @@ impl NetworkBehaviour for RequestResponsesBehaviour { peer: PeerId, addr: &Multiaddr, role_override: Endpoint, + port_use: PortUse, ) -> Result, ConnectionDenied> { let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = - r.handle_established_outbound_connection(connection_id, peer, addr, role_override) - { + if let Ok(handler) = r.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) { Some((p.to_string(), handler)) } else { None @@ -558,80 +568,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { )) } - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e)); - }, - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler, - remaining_established, - }) => - for (p_name, p_handler) in handler.into_iter() { - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler: p_handler, - remaining_established, - })); - } else { - log::error!( - target: "sub-libp2p", - "on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}", - p_name, - ) - } - }, - FromSwarm::DialFailure(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e)); - }, - FromSwarm::ListenerClosed(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e)); - }, - FromSwarm::ListenFailure(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e)); - }, - FromSwarm::ListenerError(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e)); - }, - FromSwarm::ExternalAddrExpired(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e)); - }, - FromSwarm::NewListener(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e)); - }, - FromSwarm::ExpiredListenAddr(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e)); - }, - FromSwarm::NewExternalAddrCandidate(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e)); - }, - FromSwarm::ExternalAddrConfirmed(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e)); - }, - FromSwarm::AddressChange(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e)); - }, - FromSwarm::NewListenAddr(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e)); - }, + fn on_swarm_event(&mut self, event: FromSwarm) { + for (protocol, _) in self.protocols.values_mut() { + protocol.on_swarm_event(event); } } @@ -653,11 +592,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } } - fn poll( - &mut self, - cx: &mut Context, - params: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context) -> Poll>> { 'poll_all: loop { // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { @@ -707,7 +642,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Poll request-responses protocols. for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { - 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { + 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { let ev = match ev { // Main events we are interested in. ToSwarm::GenerateEvent(ev) => ev, @@ -717,29 +652,23 @@ impl NetworkBehaviour for RequestResponsesBehaviour { ToSwarm::Dial { opts } => { if opts.get_peer_id().is_none() { log::error!( + target: "sub-libp2p", "The request-response isn't supposed to start dialing addresses" ); } return Poll::Ready(ToSwarm::Dial { opts }) }, - ToSwarm::NotifyHandler { peer_id, handler, event } => - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id, - handler, - event: ((*protocol).to_string(), event), - }), - ToSwarm::CloseConnection { peer_id, connection } => - return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), - ToSwarm::NewExternalAddrCandidate(observed) => - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), - ToSwarm::ExternalAddrConfirmed(addr) => - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), - ToSwarm::ExternalAddrExpired(addr) => - return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), - ToSwarm::ListenOn { opts } => - return Poll::Ready(ToSwarm::ListenOn { opts }), - ToSwarm::RemoveListener { id } => - return Poll::Ready(ToSwarm::RemoveListener { id }), + event => { + return Poll::Ready( + event.map_in(|event| ((*protocol).to_string(), event)).map_out( + |_| { + unreachable!( + "`GenerateEvent` is handled in a branch above; qed" + ) + }, + ), + ); + }, }; match ev { @@ -859,6 +788,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { error, .. } => { + let error = OutboundFailure::from(error); let started = match self .pending_requests .remove(&(protocol.clone(), request_id).into()) @@ -870,9 +800,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }) => { // Try using the fallback request if the protocol was not // supported. - if let request_response::OutboundFailure::UnsupportedProtocols = - error - { + if matches!(error, OutboundFailure::UnsupportedProtocols) { if let Some((fallback_request, fallback_protocol)) = fallback_request { @@ -893,7 +821,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } if response_tx - .send(Err(RequestFailure::Network(error.clone().into()))) + .send(Err(RequestFailure::Network(error.clone()))) .is_err() { log::debug!( @@ -920,7 +848,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { peer, protocol: protocol.clone(), duration: started.elapsed(), - result: Err(RequestFailure::Network(error.into())), + result: Err(RequestFailure::Network(error)), }; return Poll::Ready(ToSwarm::GenerateEvent(out)) @@ -1184,7 +1112,10 @@ mod tests { transport, behaviour, keypair.public().to_peer_id(), - SwarmConfig::with_executor(TokioExecutor(runtime)), + SwarmConfig::with_executor(TokioExecutor(runtime)) + // This is taken care of by notification protocols in non-test environment + // It is very slow in test environment for some reason, hence larger timeout + .with_idle_connection_timeout(Duration::from_secs(10)), ); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); @@ -1354,7 +1285,9 @@ mod tests { match swarm.select_next_some().await { SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { assert!(result.is_ok()); - break + }, + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } @@ -1394,20 +1327,20 @@ mod tests { } match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::ConnectionClosed) => {}, - _ => panic!(), + RequestFailure::Network(OutboundFailure::Io(_)) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), } }); } - /// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for + /// A `RequestId` is a unique identifier among either all inbound or all outbound requests for /// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across - /// multiple [`RequestResponsesBehaviour`] behaviours. Thus when handling [`RequestId`] in the + /// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the /// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the - /// protocol name with the [`RequestId`] to get a unique request identifier. + /// protocol name with the `RequestId` to get a unique request identifier. /// /// This test ensures that two requests on different protocols can be handled concurrently - /// without a [`RequestId`] collision. + /// without a `RequestId` collision. /// /// See [`ProtocolRequestId`] for additional information. #[test] diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 803b81129139..751183ae19a9 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -41,7 +41,7 @@ use crate::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, peer_store::{PeerStore, PeerStoreProvider}, - protocol::{self, NotifsHandlerError, Protocol, Ready}, + protocol::{self, Protocol, Ready}, protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId}, request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure}, service::{ @@ -59,10 +59,7 @@ use crate::{ }; use codec::DecodeAll; -use either::Either; use futures::{channel::oneshot, prelude::*}; -#[allow(deprecated)] -use libp2p::swarm::THandlerErr; use libp2p::{ connection_limits::{ConnectionLimits, Exceeded}, core::{upgrade, ConnectedPoint, Endpoint}, @@ -94,7 +91,6 @@ pub use libp2p::identity::{DecodingError, Keypair, PublicKey}; pub use metrics::NotificationMetrics; pub use protocol::NotificationsSink; use std::{ - cmp, collections::{HashMap, HashSet}, fs, iter, marker::PhantomData, @@ -115,6 +111,7 @@ pub mod signature; pub mod traits; struct Libp2pBandwidthSink { + #[allow(deprecated)] sink: Arc, } @@ -336,7 +333,7 @@ where "🏷 Local node identity is: {}", local_peer_id.to_base58(), ); - log::info!(target: "sub-libp2p", "Running libp2p network backend"); + info!(target: "sub-libp2p", "Running libp2p network backend"); let (transport, bandwidth) = { let config_mem = match network_config.transport { @@ -344,46 +341,7 @@ where TransportConfig::Normal { .. } => false, }; - // The yamux buffer size limit is configured to be equal to the maximum frame size - // of all protocols. 10 bytes are added to each limit for the length prefix that - // is not included in the upper layer protocols limit but is still present in the - // yamux buffer. These 10 bytes correspond to the maximum size required to encode - // a variable-length-encoding 64bits number. In other words, we make the - // assumption that no notification larger than 2^64 will ever be sent. - let yamux_maximum_buffer_size = { - let requests_max = request_response_protocols - .iter() - .map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX)); - let responses_max = request_response_protocols - .iter() - .map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX)); - let notifs_max = notification_protocols - .iter() - .map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX)); - - // A "default" max is added to cover all the other protocols: ping, identify, - // kademlia, block announces, and transactions. - let default_max = cmp::max( - 1024 * 1024, - usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE) - .unwrap_or(usize::MAX), - ); - - iter::once(default_max) - .chain(requests_max) - .chain(responses_max) - .chain(notifs_max) - .max() - .expect("iterator known to always yield at least one element; qed") - .saturating_add(10) - }; - - transport::build_transport( - local_identity.clone().into(), - config_mem, - network_config.yamux_window_size, - yamux_maximum_buffer_size, - ) + transport::build_transport(local_identity.clone().into(), config_mem) }; let (to_notifications, from_protocol_controllers) = @@ -1522,8 +1480,7 @@ where } /// Process the next event coming from `Swarm`. - #[allow(deprecated)] - fn handle_swarm_event(&mut self, event: SwarmEvent>>) { + fn handle_swarm_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => { if let Some(metrics) = self.metrics.as_ref() { @@ -1548,6 +1505,7 @@ where Some("busy-omitted"), ResponseFailure::Network(InboundFailure::ConnectionClosed) => Some("connection-closed"), + ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"), }; if let Some(reason) = reason { @@ -1587,6 +1545,7 @@ where "connection-closed", RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => "unsupported", + RequestFailure::Network(OutboundFailure::Io(_)) => "io", }; metrics @@ -1756,15 +1715,6 @@ where }; let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", - Some(ConnectionError::Handler(Either::Left(Either::Left( - Either::Left(Either::Right( - NotifsHandlerError::SyncNotificationsClogged, - )), - )))) => "sync-notifications-clogged", - Some(ConnectionError::Handler(Either::Left(Either::Left( - Either::Right(Either::Left(_)), - )))) => "ping-timeout", - Some(ConnectionError::Handler(_)) => "protocol-error", Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", None => "actively-closed", }; @@ -1803,7 +1753,12 @@ where not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten() { if let DialError::WrongPeerId { obtained, endpoint } = &error { - if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint { + if let ConnectedPoint::Dialer { + address, + role_override: _, + port_use: _, + } = endpoint + { let address_without_peer_id = parse_addr(address.clone().into()) .map_or_else(|_| address.clone(), |r| r.1.into()); @@ -1824,7 +1779,6 @@ where } if let Some(metrics) = self.metrics.as_ref() { - #[allow(deprecated)] let reason = match error { DialError::Denied { cause } => if cause.downcast::().is_ok() { @@ -1864,7 +1818,6 @@ where "Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}" ); if let Some(metrics) = self.metrics.as_ref() { - #[allow(deprecated)] let reason = match error { ListenError::Denied { cause } => if cause.downcast::().is_ok() { @@ -1917,6 +1870,21 @@ where metrics.listeners_errors_total.inc(); } }, + SwarmEvent::NewExternalAddrCandidate { address } => { + trace!(target: "sub-libp2p", "Libp2p => NewExternalAddrCandidate: {address:?}"); + }, + SwarmEvent::ExternalAddrConfirmed { address } => { + trace!(target: "sub-libp2p", "Libp2p => ExternalAddrConfirmed: {address:?}"); + }, + SwarmEvent::ExternalAddrExpired { address } => { + trace!(target: "sub-libp2p", "Libp2p => ExternalAddrExpired: {address:?}"); + }, + SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { + trace!(target: "sub-libp2p", "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}") + }, + event => { + warn!(target: "sub-libp2p", "New unknown SwarmEvent libp2p event: {event:?}"); + }, } } } diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs index ed7e7c574e16..2f6b7a643c48 100644 --- a/substrate/client/network/src/transport.rs +++ b/substrate/client/network/src/transport.rs @@ -29,6 +29,8 @@ use libp2p::{ }; use std::{sync::Arc, time::Duration}; +// TODO: Create a wrapper similar to upstream `BandwidthTransport` that tracks sent/received bytes +#[allow(deprecated)] pub use libp2p::bandwidth::BandwidthSinks; /// Builds the transport that serves as a common ground for all connections. @@ -36,21 +38,12 @@ pub use libp2p::bandwidth::BandwidthSinks; /// If `memory_only` is true, then only communication within the same process are allowed. Only /// addresses with the format `/memory/...` are allowed. /// -/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the -/// default (256kiB). -/// -/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be -/// set either to the maximum of all the maximum allowed sizes of messages frames of all -/// high-level protocols combined, or to some generously high value if you are sure that a maximum -/// size is enforced on all high-level protocols. -/// /// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all /// the connections spawned with this transport. +#[allow(deprecated)] pub fn build_transport( keypair: identity::Keypair, memory_only: bool, - yamux_window_size: Option, - yamux_maximum_buffer_size: usize, ) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) { // Build the base layer of the transport. let transport = if !memory_only { @@ -81,19 +74,7 @@ pub fn build_transport( }; let authentication_config = noise::Config::new(&keypair).expect("Can create noise config. qed"); - let multiplexing_config = { - let mut yamux_config = libp2p::yamux::Config::default(); - // Enable proper flow-control: window updates are only sent when - // buffered data has been consumed. - yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read()); - yamux_config.set_max_buffer_size(yamux_maximum_buffer_size); - - if let Some(yamux_window_size) = yamux_window_size { - yamux_config.set_receive_window_size(yamux_window_size); - } - - yamux_config - }; + let multiplexing_config = libp2p::yamux::Config::default(); let transport = transport .upgrade(upgrade::Version::V1Lazy) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 349c41ee1f4a..0c39ea0b93c0 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -100,6 +100,8 @@ mod rep { pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); /// Reputation change when a peer doesn't respond in time to our messages. pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); + /// Reputation change when a peer connection failed with IO error. + pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request"); } struct Metrics { @@ -1019,9 +1021,14 @@ where debug_assert!( false, "Can not receive `RequestFailure::Obsolete` after dropping the \ - response receiver.", + response receiver.", ); }, + RequestFailure::Network(OutboundFailure::Io(_)) => { + self.network_service.report_peer(peer_id, rep::IO); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, } }, Err(oneshot::Canceled) => { diff --git a/substrate/client/network/types/Cargo.toml b/substrate/client/network/types/Cargo.toml index 7438eaeffcd2..67814f135d39 100644 --- a/substrate/client/network/types/Cargo.toml +++ b/substrate/client/network/types/Cargo.toml @@ -14,7 +14,7 @@ bs58 = { workspace = true, default-features = true } bytes = { version = "1.4.0", default-features = false } ed25519-dalek = { workspace = true, default-features = true } libp2p-identity = { features = ["ed25519", "peerid", "rand"], workspace = true } -libp2p-kad = { version = "0.44.6", default-features = false } +libp2p-kad = { version = "0.46.2", default-features = false } litep2p = { workspace = true } log = { workspace = true, default-features = true } multiaddr = { workspace = true } diff --git a/substrate/client/telemetry/Cargo.toml b/substrate/client/telemetry/Cargo.toml index f87e8b66f731..db325a94ab21 100644 --- a/substrate/client/telemetry/Cargo.toml +++ b/substrate/client/telemetry/Cargo.toml @@ -19,7 +19,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] chrono = { workspace = true } futures = { workspace = true } -libp2p = { features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"], workspace = true } +libp2p = { features = ["dns", "tcp", "tokio", "websocket"], workspace = true } log = { workspace = true, default-features = true } parking_lot = { workspace = true, default-features = true } pin-project = { workspace = true } diff --git a/substrate/client/telemetry/src/node.rs b/substrate/client/telemetry/src/node.rs index 0bbdbfb622ef..2c8d424c4340 100644 --- a/substrate/client/telemetry/src/node.rs +++ b/substrate/client/telemetry/src/node.rs @@ -18,7 +18,13 @@ use crate::TelemetryPayload; use futures::{channel::mpsc, prelude::*}; -use libp2p::{core::transport::Transport, Multiaddr}; +use libp2p::{ + core::{ + transport::{DialOpts, PortUse, Transport}, + Endpoint, + }, + Multiaddr, +}; use rand::Rng as _; use std::{ fmt, mem, @@ -229,7 +235,10 @@ where }, NodeSocket::ReconnectNow => { let addr = self.addr.clone(); - match self.transport.dial(addr) { + match self + .transport + .dial(addr, DialOpts { role: Endpoint::Dialer, port_use: PortUse::New }) + { Ok(d) => { log::trace!(target: "telemetry", "Re-dialing {}", self.addr); socket = NodeSocket::Dialing(d);