From 7fbb67de46aada40c25957ca286e3877ef87d810 Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Wed, 23 Aug 2023 19:31:17 +0300 Subject: [PATCH] Backport: Implement basic equivocations detection loop (#2375) * Implement basic equivocations detection loop (#2367) * FinalityProofsBuf adjustments - store a Vec - transform prune `buf_limit` to Option * FinalityProof: add target_header_hash() * Target client: implement best_synced_header_hash() * Implement first version of the equivocations detection loop * Address code review comments * Leftover * polkadot-staging adjustments --- Cargo.lock | 5 + .../header-chain/src/justification/mod.rs | 6 +- primitives/header-chain/src/lib.rs | 7 +- relays/equivocation/Cargo.toml | 5 + relays/equivocation/src/equivocation_loop.rs | 336 ++++++++++++++++++ relays/equivocation/src/lib.rs | 29 +- relays/equivocation/src/reporter.rs | 83 +++++ relays/finality/src/base.rs | 2 +- relays/finality/src/finality_loop.rs | 6 +- relays/finality/src/finality_proofs.rs | 67 ++-- relays/finality/src/headers.rs | 21 +- relays/finality/src/lib.rs | 1 + relays/finality/src/mock.rs | 6 +- .../src/equivocation/target.rs | 28 +- .../src/finality/target.rs | 8 +- .../src/finality_base/engine.rs | 10 +- .../src/finality_base/mod.rs | 20 ++ .../src/messages_source.rs | 7 +- relays/utils/src/lib.rs | 9 +- 19 files changed, 577 insertions(+), 79 deletions(-) create mode 100644 relays/equivocation/src/equivocation_loop.rs create mode 100644 relays/equivocation/src/reporter.rs diff --git a/Cargo.lock b/Cargo.lock index 056058889f5d7..b60ceebbb6de9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3106,9 +3106,14 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" name = "equivocation-detector" version = "0.1.0" dependencies = [ + "async-std", "async-trait", "bp-header-chain", "finality-relay", + "frame-support", + "futures", + "log", + "num-traits", "relay-utils", ] diff --git a/primitives/header-chain/src/justification/mod.rs b/primitives/header-chain/src/justification/mod.rs index 5fa5d7d607c8a..fbb8af80b2cdd 100644 --- a/primitives/header-chain/src/justification/mod.rs +++ b/primitives/header-chain/src/justification/mod.rs @@ -97,7 +97,11 @@ impl GrandpaJustification { } } -impl crate::FinalityProof for GrandpaJustification { +impl crate::FinalityProof for GrandpaJustification { + fn target_header_hash(&self) -> H::Hash { + self.commit.target_hash + } + fn target_header_number(&self) -> H::Number { self.commit.target_number } diff --git a/primitives/header-chain/src/lib.rs b/primitives/header-chain/src/lib.rs index ea6c58f4c0979..b1eda8dc3f73c 100644 --- a/primitives/header-chain/src/lib.rs +++ b/primitives/header-chain/src/lib.rs @@ -127,7 +127,10 @@ pub struct InitializationData { } /// Abstract finality proof that is justifying block finality. -pub trait FinalityProof: Clone + Send + Sync + Debug { +pub trait FinalityProof: Clone + Send + Sync + Debug { + /// Return hash of header that this proof is generated for. + fn target_header_hash(&self) -> Hash; + /// Return number of header that this proof is generated for. fn target_header_number(&self) -> Number; } @@ -209,7 +212,7 @@ impl TryFrom> for HeaderGrandpa /// Helper trait for finding equivocations in finality proofs. pub trait FindEquivocations { /// The type returned when encountering an error while looking for equivocations. - type Error; + type Error: Debug; /// Find equivocations. fn find_equivocations( diff --git a/relays/equivocation/Cargo.toml b/relays/equivocation/Cargo.toml index 93a1470c6a978..637fa4feaf686 100644 --- a/relays/equivocation/Cargo.toml +++ b/relays/equivocation/Cargo.toml @@ -7,7 +7,12 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" description = "Equivocation detector" [dependencies] +async-std = "1.6.5" async-trait = "0.1" bp-header-chain = { path = "../../primitives/header-chain" } finality-relay = { path = "../finality" } +frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" } +futures = "0.3.28" +log = "0.4.20" +num-traits = "0.2" relay-utils = { path = "../utils" } diff --git a/relays/equivocation/src/equivocation_loop.rs b/relays/equivocation/src/equivocation_loop.rs new file mode 100644 index 0000000000000..5b935786353cf --- /dev/null +++ b/relays/equivocation/src/equivocation_loop.rs @@ -0,0 +1,336 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::{ + reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo, + SourceClient, TargetClient, +}; + +use bp_header_chain::{FinalityProof, FindEquivocations}; +use finality_relay::{FinalityProofsBuf, FinalityProofsStream}; +use futures::{select, FutureExt}; +use num_traits::Saturating; +use relay_utils::{ + relay_loop::{reconnect_failed_client, RECONNECT_DELAY}, + FailedClient, MaybeConnectionError, +}; +use std::{future::Future, time::Duration}; + +/// The context needed for finding equivocations inside finality proofs and reporting them. +struct EquivocationReportingContext { + synced_header_hash: P::Hash, + synced_verification_context: P::FinalityVerificationContext, +} + +impl EquivocationReportingContext

{ + /// Try to get the `EquivocationReportingContext` used by the target chain + /// at the provided block. + async fn try_read_from_target>( + target_client: &TC, + at: P::TargetNumber, + ) -> Result, TC::Error> { + let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?; + Ok(match maybe_best_synced_header_hash { + Some(best_synced_header_hash) => Some(EquivocationReportingContext { + synced_header_hash: best_synced_header_hash, + synced_verification_context: target_client + .finality_verification_context(at) + .await?, + }), + None => None, + }) + } + + /// Update with the new context introduced by the `HeaderFinalityInfo

` if any. + fn update(&mut self, info: HeaderFinalityInfo

) { + if let Some(new_verification_context) = info.new_verification_context { + self.synced_header_hash = info.finality_proof.target_header_hash(); + self.synced_verification_context = new_verification_context; + } + } +} + +/// Equivocations detection loop state. +struct EquivocationDetectionLoop< + P: EquivocationDetectionPipeline, + SC: SourceClient

, + TC: TargetClient

, +> { + source_client: SC, + target_client: TC, + + from_block_num: Option, + until_block_num: Option, + + reporter: EquivocationsReporter, + + finality_proofs_stream: FinalityProofsStream, + finality_proofs_buf: FinalityProofsBuf

, +} + +impl, TC: TargetClient

> + EquivocationDetectionLoop +{ + async fn handle_source_error(&mut self, e: SC::Error) { + if e.is_connection_error() { + reconnect_failed_client( + FailedClient::Source, + RECONNECT_DELAY, + &mut self.source_client, + &mut self.target_client, + ) + .await; + } else { + async_std::task::sleep(RECONNECT_DELAY).await; + } + } + + async fn handle_target_error(&mut self, e: TC::Error) { + if e.is_connection_error() { + reconnect_failed_client( + FailedClient::Target, + RECONNECT_DELAY, + &mut self.source_client, + &mut self.target_client, + ) + .await; + } else { + async_std::task::sleep(RECONNECT_DELAY).await; + } + } + + async fn ensure_finality_proofs_stream(&mut self) { + match self.finality_proofs_stream.ensure_stream(&self.source_client).await { + Ok(_) => {}, + Err(e) => { + log::error!( + target: "bridge", + "Could not connect to the {} `FinalityProofsStream`: {e:?}", + P::SOURCE_NAME, + ); + + // Reconnect to the source client if needed + self.handle_source_error(e).await + }, + } + } + + async fn best_finalized_target_block_number(&mut self) -> Option { + match self.target_client.best_finalized_header_number().await { + Ok(block_num) => Some(block_num), + Err(e) => { + log::error!( + target: "bridge", + "Could not read best finalized header number from {}: {e:?}", + P::TARGET_NAME, + ); + + // Reconnect target client and move on + self.handle_target_error(e).await; + + None + }, + } + } + + async fn build_equivocation_reporting_context( + &mut self, + block_num: P::TargetNumber, + ) -> Option> { + match EquivocationReportingContext::try_read_from_target( + &self.target_client, + block_num.saturating_sub(1.into()), + ) + .await + { + Ok(Some(context)) => Some(context), + Ok(None) => None, + Err(e) => { + log::error!( + target: "bridge", + "Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + + // Reconnect target client if needed and move on. + self.handle_target_error(e).await; + None + }, + } + } + + /// Try to get the finality info associated to the source headers synced with the target chain + /// at the specified block. + async fn synced_source_headers_at_target( + &mut self, + at: P::TargetNumber, + ) -> Vec> { + match self.target_client.synced_headers_finality_info(at).await { + Ok(synced_headers) => synced_headers, + Err(e) => { + log::error!( + target: "bridge", + "Could not get {} headers synced to {} at block {at:?}", + P::SOURCE_NAME, + P::TARGET_NAME + ); + + // Reconnect in case of a connection error. + self.handle_target_error(e).await; + // And move on to the next block. + vec![] + }, + } + } + + async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) { + match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await { + Ok(_) => {}, + Err(e) => { + log::error!( + target: "bridge", + "Could not submit equivocation report to {} for {equivocation:?}: {e:?}", + P::SOURCE_NAME, + ); + + // Reconnect source client and move on + self.handle_source_error(e).await; + }, + } + } + + async fn check_block( + &mut self, + block_num: P::TargetNumber, + context: &mut EquivocationReportingContext

, + ) { + let synced_headers = self.synced_source_headers_at_target(block_num).await; + + for synced_header in synced_headers { + self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); + + let equivocations = match P::EquivocationsFinder::find_equivocations( + &context.synced_verification_context, + &synced_header.finality_proof, + self.finality_proofs_buf.buf().as_slice(), + ) { + Ok(equivocations) => equivocations, + Err(e) => { + log::error!( + target: "bridge", + "Could not search for equivocations in the finality proof \ + for source header {:?} synced at target block {block_num:?}: {e:?}", + synced_header.finality_proof.target_header_hash() + ); + continue + }, + }; + for equivocation in equivocations { + self.report_equivocation(context.synced_header_hash, equivocation).await; + } + + self.finality_proofs_buf + .prune(synced_header.finality_proof.target_header_number(), None); + context.update(synced_header); + } + } + + async fn do_run(&mut self, tick: Duration, exit_signal: impl Future) { + let exit_signal = exit_signal.fuse(); + futures::pin_mut!(exit_signal); + + loop { + // Make sure that we are connected to the source finality proofs stream. + self.ensure_finality_proofs_stream().await; + // Check the status of the pending equivocation reports + self.reporter.process_pending_reports().await; + + // Update blocks range. + if let Some(block_number) = self.best_finalized_target_block_number().await { + self.from_block_num.get_or_insert(block_number); + self.until_block_num = Some(block_number); + } + let (from, until) = match (self.from_block_num, self.until_block_num) { + (Some(from), Some(until)) => (from, until), + _ => continue, + }; + + // Check the available blocks + let mut current_block_number = from; + while current_block_number <= until { + let mut context = + match self.build_equivocation_reporting_context(current_block_number).await { + Some(context) => context, + None => continue, + }; + self.check_block(current_block_number, &mut context).await; + current_block_number = current_block_number.saturating_add(1.into()); + } + self.until_block_num = Some(current_block_number); + + select! { + _ = async_std::task::sleep(tick).fuse() => {}, + _ = exit_signal => return, + } + } + } + + pub async fn run( + source_client: SC, + target_client: TC, + tick: Duration, + exit_signal: impl Future, + ) -> Result<(), FailedClient> { + let mut equivocation_detection_loop = Self { + source_client, + target_client, + from_block_num: None, + until_block_num: None, + reporter: EquivocationsReporter::::new(), + finality_proofs_stream: FinalityProofsStream::new(), + finality_proofs_buf: FinalityProofsBuf::new(vec![]), + }; + + equivocation_detection_loop.do_run(tick, exit_signal).await; + Ok(()) + } +} + +/// Spawn the equivocations detection loop. +/// TODO: remove `#[allow(dead_code)]` +#[allow(dead_code)] +pub async fn run( + source_client: impl SourceClient

, + target_client: impl TargetClient

, + tick: Duration, + exit_signal: impl Future + 'static + Send, +) -> Result<(), relay_utils::Error> { + let exit_signal = exit_signal.shared(); + relay_utils::relay_loop(source_client, target_client) + .run( + format!("{}_to_{}_EquivocationDetection", P::SOURCE_NAME, P::TARGET_NAME), + move |source_client, target_client, _metrics| { + EquivocationDetectionLoop::run( + source_client, + target_client, + tick, + exit_signal.clone(), + ) + }, + ) + .await +} diff --git a/relays/equivocation/src/lib.rs b/relays/equivocation/src/lib.rs index 9019c3cf69f6c..bbae97502f579 100644 --- a/relays/equivocation/src/lib.rs +++ b/relays/equivocation/src/lib.rs @@ -14,18 +14,22 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +mod equivocation_loop; +mod reporter; + use async_trait::async_trait; -use bp_header_chain::{FindEquivocations, HeaderFinalityInfo}; +use bp_header_chain::FindEquivocations; use finality_relay::{FinalityPipeline, SourceClientBase}; use relay_utils::{relay_loop::Client as RelayClient, TransactionTracker}; +use std::fmt::Debug; pub trait EquivocationDetectionPipeline: FinalityPipeline { /// Block number of the target chain. type TargetNumber: relay_utils::BlockNumberBase; /// The context needed for validating finality proofs. - type FinalityVerificationContext; + type FinalityVerificationContext: Send; /// The type of the equivocation proof. - type EquivocationProof; + type EquivocationProof: Clone + Debug + Send + Sync; /// The equivocations finder. type EquivocationsFinder: FindEquivocations< Self::FinalityProof, @@ -34,6 +38,11 @@ pub trait EquivocationDetectionPipeline: FinalityPipeline { >; } +type HeaderFinalityInfo

= bp_header_chain::HeaderFinalityInfo< +

::FinalityProof, +

::FinalityVerificationContext, +>; + /// Source client used in equivocation detection loop. #[async_trait] pub trait SourceClient: SourceClientBase

{ @@ -51,6 +60,15 @@ pub trait SourceClient: SourceClientBase

{ /// Target client used in equivocation detection loop. #[async_trait] pub trait TargetClient: RelayClient { + /// Get the best finalized header number. + async fn best_finalized_header_number(&self) -> Result; + + /// Get the hash of the best source header known by the target at the provided block number. + async fn best_synced_header_hash( + &self, + at: P::TargetNumber, + ) -> Result, Self::Error>; + /// Get the data stored by the target at the specified block for validating source finality /// proofs. async fn finality_verification_context( @@ -63,8 +81,5 @@ pub trait TargetClient: RelayClient { async fn synced_headers_finality_info( &self, at: P::TargetNumber, - ) -> Result< - Vec>, - Self::Error, - >; + ) -> Result>, Self::Error>; } diff --git a/relays/equivocation/src/reporter.rs b/relays/equivocation/src/reporter.rs new file mode 100644 index 0000000000000..27b4d71beb01b --- /dev/null +++ b/relays/equivocation/src/reporter.rs @@ -0,0 +1,83 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Helper struct used for submitting finality reports and tracking their status. + +use crate::{EquivocationDetectionPipeline, SourceClient}; + +use futures::FutureExt; +use relay_utils::{TrackedTransactionFuture, TrackedTransactionStatus, TransactionTracker}; +use std::{ + future::poll_fn, + task::{Context, Poll}, +}; + +pub struct EquivocationsReporter> { + pending_reports: Vec>, +} + +impl> EquivocationsReporter { + pub fn new() -> Self { + Self { pending_reports: vec![] } + } + + /// Submit a `report_equivocation()` transaction to the source chain. + /// + /// We store the transaction tracker for future monitoring. + pub async fn submit_report( + &mut self, + source_client: &SC, + at: P::Hash, + equivocation: P::EquivocationProof, + ) -> Result<(), SC::Error> { + let pending_report = source_client.report_equivocation(at, equivocation).await?; + self.pending_reports.push(pending_report.wait()); + + Ok(()) + } + + fn do_process_pending_reports(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.pending_reports.retain_mut(|pending_report| { + match pending_report.poll_unpin(cx) { + Poll::Ready(tx_status) => { + match tx_status { + TrackedTransactionStatus::Lost => { + log::error!(target: "bridge", "Equivocation report tx was lost"); + }, + TrackedTransactionStatus::Finalized(id) => { + log::error!(target: "bridge", "Equivocation report tx was finalized in source block {id:?}"); + }, + } + + // The future was processed. Drop it. + false + }, + Poll::Pending => { + // The future is still pending. Retain it. + true + }, + } + }); + + Poll::Ready(()) + } + + /// Iterate through all the pending `report_equivocation()` transactions + /// and log the ones that finished. + pub async fn process_pending_reports(&mut self) { + poll_fn(|cx| self.do_process_pending_reports(cx)).await + } +} diff --git a/relays/finality/src/base.rs b/relays/finality/src/base.rs index bf9acbdf98210..4253468eaace1 100644 --- a/relays/finality/src/base.rs +++ b/relays/finality/src/base.rs @@ -32,7 +32,7 @@ pub trait FinalityPipeline: 'static + Clone + Debug + Send + Sync { /// Synced headers are identified by this number. type Number: relay_utils::BlockNumberBase; /// Finality proof type. - type FinalityProof: FinalityProof; + type FinalityProof: FinalityProof; } /// Source client used in finality related loops. diff --git a/relays/finality/src/finality_loop.rs b/relays/finality/src/finality_loop.rs index b1f1f018c0edf..e31d8a708122d 100644 --- a/relays/finality/src/finality_loop.rs +++ b/relays/finality/src/finality_loop.rs @@ -319,8 +319,10 @@ impl, TC: TargetClient

> Finality .as_ref() .map(|justified_header| justified_header.number()) .unwrap_or(info.best_number_at_target); - self.finality_proofs_buf - .prune(oldest_finality_proof_to_keep, self.sync_params.recent_finality_proofs_limit); + self.finality_proofs_buf.prune( + oldest_finality_proof_to_keep, + Some(self.sync_params.recent_finality_proofs_limit), + ); Ok(maybe_justified_header) } diff --git a/relays/finality/src/finality_proofs.rs b/relays/finality/src/finality_proofs.rs index d457c0693bf3e..cd6d12938ce42 100644 --- a/relays/finality/src/finality_proofs.rs +++ b/relays/finality/src/finality_proofs.rs @@ -20,11 +20,8 @@ use bp_header_chain::FinalityProof; use futures::{FutureExt, Stream, StreamExt}; use std::pin::Pin; -/// Finality proofs container. Ordered by target header number. -pub type FinalityProofs

= - Vec<(

::Number,

::FinalityProof)>; - /// Source finality proofs stream that may be restarted. +#[derive(Default)] pub struct FinalityProofsStream> { /// The underlying stream. stream: Option>>, @@ -75,16 +72,16 @@ impl> FinalityProofsStream { /// Source finality proofs buffer. pub struct FinalityProofsBuf { - /// Proofs buffer. - buf: FinalityProofs

, + /// Proofs buffer. Ordered by target header number. + buf: Vec, } impl FinalityProofsBuf

{ - pub fn new(buf: FinalityProofs

) -> Self { + pub fn new(buf: Vec) -> Self { Self { buf } } - pub fn buf(&self) -> &FinalityProofs

{ + pub fn buf(&self) -> &Vec { &self.buf } @@ -98,7 +95,7 @@ impl FinalityProofsBuf

{ last_header_number = Some(target_header_number); proofs_count += 1; - self.buf.push((target_header_number, finality_proof)); + self.buf.push(finality_proof); } if proofs_count != 0 { @@ -113,15 +110,19 @@ impl FinalityProofsBuf

{ } } - pub fn prune(&mut self, until_hdr_num: P::Number, buf_limit: usize) { - let kept_hdr_idx = self + /// Prune all finality proofs that target header numbers older than `first_to_keep`. + pub fn prune(&mut self, first_to_keep: P::Number, maybe_buf_limit: Option) { + let first_to_keep_idx = self .buf - .binary_search_by_key(&until_hdr_num, |(hdr_num, _)| *hdr_num) + .binary_search_by_key(&first_to_keep, |hdr| hdr.target_header_number()) .map(|idx| idx + 1) .unwrap_or_else(|idx| idx); - let buf_limit_idx = self.buf.len().saturating_sub(buf_limit); + let buf_limit_idx = match maybe_buf_limit { + Some(buf_limit) => self.buf.len().saturating_sub(buf_limit), + None => 0, + }; - self.buf = self.buf.split_off(std::cmp::max(kept_hdr_idx, buf_limit_idx)); + self.buf = self.buf.split_off(std::cmp::max(first_to_keep_idx, buf_limit_idx)); } } @@ -140,13 +141,13 @@ mod tests { fn finality_proofs_buf_fill_works() { // when stream is currently empty, nothing is changed let mut finality_proofs_buf = - FinalityProofsBuf:: { buf: vec![(1, TestFinalityProof(1))] }; + FinalityProofsBuf:: { buf: vec![TestFinalityProof(1)] }; let mut stream = FinalityProofsStream::::from_stream( Box::pin(futures::stream::pending()), ); finality_proofs_buf.fill(&mut stream); - assert_eq!(finality_proofs_buf.buf, vec![(1, TestFinalityProof(1))]); + assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1)]); assert!(stream.stream.is_some()); // when stream has entry with target, it is added to the recent proofs container @@ -158,10 +159,7 @@ mod tests { ), ); finality_proofs_buf.fill(&mut stream); - assert_eq!( - finality_proofs_buf.buf, - vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] - ); + assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]); assert!(stream.stream.is_some()); // when stream has ended, we'll need to restart it @@ -170,21 +168,20 @@ mod tests { Box::pin(futures::stream::empty()), ); finality_proofs_buf.fill(&mut stream); - assert_eq!( - finality_proofs_buf.buf, - vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))] - ); + assert_eq!(finality_proofs_buf.buf, vec![TestFinalityProof(1), TestFinalityProof(4)]); assert!(stream.stream.is_none()); } #[test] fn finality_proofs_buf_prune_works() { - let original_finality_proofs_buf: FinalityProofs = vec![ - (10, TestFinalityProof(10)), - (13, TestFinalityProof(13)), - (15, TestFinalityProof(15)), - (17, TestFinalityProof(17)), - (19, TestFinalityProof(19)), + let original_finality_proofs_buf: Vec< + ::FinalityProof, + > = vec![ + TestFinalityProof(10), + TestFinalityProof(13), + TestFinalityProof(15), + TestFinalityProof(17), + TestFinalityProof(19), ] .into_iter() .collect(); @@ -193,35 +190,35 @@ mod tests { let mut finality_proofs_buf = FinalityProofsBuf:: { buf: original_finality_proofs_buf.clone(), }; - finality_proofs_buf.prune(10, 1024); + finality_proofs_buf.prune(10, None); assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,); // when there are no proof for justified header in the vec let mut finality_proofs_buf = FinalityProofsBuf:: { buf: original_finality_proofs_buf.clone(), }; - finality_proofs_buf.prune(11, 1024); + finality_proofs_buf.prune(11, None); assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,); // when there are too many entries after initial prune && they also need to be pruned let mut finality_proofs_buf = FinalityProofsBuf:: { buf: original_finality_proofs_buf.clone(), }; - finality_proofs_buf.prune(10, 2); + finality_proofs_buf.prune(10, Some(2)); assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,); // when last entry is pruned let mut finality_proofs_buf = FinalityProofsBuf:: { buf: original_finality_proofs_buf.clone(), }; - finality_proofs_buf.prune(19, 2); + finality_proofs_buf.prune(19, Some(2)); assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,); // when post-last entry is pruned let mut finality_proofs_buf = FinalityProofsBuf:: { buf: original_finality_proofs_buf.clone(), }; - finality_proofs_buf.prune(20, 2); + finality_proofs_buf.prune(20, Some(2)); assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,); } } diff --git a/relays/finality/src/headers.rs b/relays/finality/src/headers.rs index bdb05c9d9b72f..91f7cd0378ecd 100644 --- a/relays/finality/src/headers.rs +++ b/relays/finality/src/headers.rs @@ -19,6 +19,7 @@ use crate::{ SourceClient, SourceHeader, TargetClient, }; +use bp_header_chain::FinalityProof; use std::cmp::Ordering; /// Unjustified headers container. Ordered by header number. @@ -120,18 +121,18 @@ impl JustifiedHeaderSelector

{ while let (Some(finality_proof), Some(unjustified_header)) = (maybe_finality_proof, maybe_unjustified_header) { - match finality_proof.0.cmp(&unjustified_header.number()) { + match finality_proof.target_header_number().cmp(&unjustified_header.number()) { Ordering::Equal => { log::trace!( target: "bridge", "Managed to improve selected {} finality proof {:?} to {:?}.", P::SOURCE_NAME, maybe_justified_header.as_ref().map(|justified_header| justified_header.number()), - finality_proof.0 + finality_proof.target_header_number() ); return Some(JustifiedHeader { header: unjustified_header.clone(), - proof: finality_proof.1.clone(), + proof: finality_proof.clone(), }) }, Ordering::Less => maybe_unjustified_header = unjustified_headers_iter.next(), @@ -160,7 +161,7 @@ mod tests { fn select_better_recent_finality_proof_works() { // if there are no unjustified headers, nothing is changed let finality_proofs_buf = - FinalityProofsBuf::::new(vec![(5, TestFinalityProof(5))]); + FinalityProofsBuf::::new(vec![TestFinalityProof(5)]); let justified_header = JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; let selector = JustifiedHeaderSelector::Regular(vec![], justified_header.clone()); @@ -179,8 +180,8 @@ mod tests { // if there's no intersection between recent finality proofs and unjustified headers, // nothing is changed let finality_proofs_buf = FinalityProofsBuf::::new(vec![ - (1, TestFinalityProof(1)), - (4, TestFinalityProof(4)), + TestFinalityProof(1), + TestFinalityProof(4), ]); let justified_header = JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; @@ -193,8 +194,8 @@ mod tests { // if there's intersection between recent finality proofs and unjustified headers, but there // are no proofs in this intersection, nothing is changed let finality_proofs_buf = FinalityProofsBuf::::new(vec![ - (7, TestFinalityProof(7)), - (11, TestFinalityProof(11)), + TestFinalityProof(7), + TestFinalityProof(11), ]); let justified_header = JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; @@ -213,8 +214,8 @@ mod tests { // - this better (last from intersection) proof is selected; // - 'obsolete' unjustified headers are pruned. let finality_proofs_buf = FinalityProofsBuf::::new(vec![ - (7, TestFinalityProof(7)), - (9, TestFinalityProof(9)), + TestFinalityProof(7), + TestFinalityProof(9), ]); let justified_header = JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) }; diff --git a/relays/finality/src/lib.rs b/relays/finality/src/lib.rs index 51cd9a0935535..3579e68e1ef9c 100644 --- a/relays/finality/src/lib.rs +++ b/relays/finality/src/lib.rs @@ -22,6 +22,7 @@ pub use crate::{ base::{FinalityPipeline, SourceClientBase}, finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient}, + finality_proofs::{FinalityProofsBuf, FinalityProofsStream}, sync_loop_metrics::SyncLoopMetrics, }; diff --git a/relays/finality/src/mock.rs b/relays/finality/src/mock.rs index 181504ce26070..e3ec4e4d0d47a 100644 --- a/relays/finality/src/mock.rs +++ b/relays/finality/src/mock.rs @@ -106,7 +106,11 @@ impl SourceHeader> #[derive(Debug, Clone, PartialEq, Eq)] pub struct TestFinalityProof(pub TestNumber); -impl FinalityProof for TestFinalityProof { +impl FinalityProof for TestFinalityProof { + fn target_header_hash(&self) -> TestHash { + Default::default() + } + fn target_header_number(&self) -> TestNumber { self.0 } diff --git a/relays/lib-substrate-relay/src/equivocation/target.rs b/relays/lib-substrate-relay/src/equivocation/target.rs index 36343a6eddb8e..f7e63dd943af5 100644 --- a/relays/lib-substrate-relay/src/equivocation/target.rs +++ b/relays/lib-substrate-relay/src/equivocation/target.rs @@ -17,14 +17,16 @@ //! Default generic implementation of equivocation source for basic Substrate client. use crate::{ - equivocation::{EquivocationDetectionPipelineAdapter, SubstrateEquivocationDetectionPipeline}, - finality_base::engine::Engine, + equivocation::{ + EquivocationDetectionPipelineAdapter, FinalityProoffOf, FinalityVerificationContextfOf, + SubstrateEquivocationDetectionPipeline, + }, + finality_base::{best_synced_header_id, engine::Engine}, }; -use crate::equivocation::{FinalityProoffOf, FinalityVerificationContextfOf}; use async_trait::async_trait; use bp_header_chain::HeaderFinalityInfo; -use bp_runtime::BlockNumberOf; +use bp_runtime::{BlockNumberOf, HashOf}; use equivocation_detector::TargetClient; use relay_substrate_client::{Client, Error}; use relay_utils::relay_loop::Client as RelayClient; @@ -59,6 +61,24 @@ impl RelayClient for SubstrateEquivoc impl TargetClient> for SubstrateEquivocationTarget

{ + async fn best_finalized_header_number( + &self, + ) -> Result, Self::Error> { + self.client.best_finalized_header_number().await + } + + async fn best_synced_header_hash( + &self, + at: BlockNumberOf, + ) -> Result>, Self::Error> { + Ok(best_synced_header_id::( + &self.client, + self.client.header_by_number(at).await?.hash(), + ) + .await? + .map(|id| id.hash())) + } + async fn finality_verification_context( &self, at: BlockNumberOf, diff --git a/relays/lib-substrate-relay/src/finality/target.rs b/relays/lib-substrate-relay/src/finality/target.rs index 1be668392dd36..930f0360311c3 100644 --- a/relays/lib-substrate-relay/src/finality/target.rs +++ b/relays/lib-substrate-relay/src/finality/target.rs @@ -20,7 +20,7 @@ use crate::{ finality::{ FinalitySyncPipelineAdapter, SubmitFinalityProofCallBuilder, SubstrateFinalitySyncPipeline, }, - finality_base::{engine::Engine, SubstrateFinalityProof}, + finality_base::{best_synced_header_id, engine::Engine, SubstrateFinalityProof}, TransactionParams, }; @@ -31,6 +31,7 @@ use relay_substrate_client::{ TransactionTracker, UnsignedTransaction, }; use relay_utils::relay_loop::Client as RelayClient; +use sp_runtime::traits::Header; /// Substrate client as Substrate finality target. pub struct SubstrateFinalityTarget { @@ -94,12 +95,11 @@ impl TargetClient( + Ok(best_synced_header_id::( &self.client, - None, + self.client.best_header().await?.hash(), ) .await? - .best_finalized_peer_at_best_self .ok_or(Error::BridgePalletIsNotInitialized)?) } diff --git a/relays/lib-substrate-relay/src/finality_base/engine.rs b/relays/lib-substrate-relay/src/finality_base/engine.rs index 49614638f89e6..afb2229fc4cf3 100644 --- a/relays/lib-substrate-relay/src/finality_base/engine.rs +++ b/relays/lib-substrate-relay/src/finality_base/engine.rs @@ -36,7 +36,7 @@ use relay_substrate_client::{ use sp_consensus_grandpa::{AuthorityList as GrandpaAuthoritiesSet, GRANDPA_ENGINE_ID}; use sp_core::{storage::StorageKey, Bytes}; use sp_runtime::{scale_info::TypeInfo, traits::Header, ConsensusEngineId}; -use std::marker::PhantomData; +use std::{fmt::Debug, marker::PhantomData}; /// Finality engine, used by the Substrate chain. #[async_trait] @@ -48,11 +48,11 @@ pub trait Engine: Send { /// Type of Finality RPC client used by this engine. type FinalityClient: SubstrateFinalityClient; /// Type of finality proofs, used by consensus engine. - type FinalityProof: FinalityProof> + Decode + Encode; + type FinalityProof: FinalityProof, BlockNumberOf> + Decode + Encode; /// The context needed for verifying finality proofs. - type FinalityVerificationContext; + type FinalityVerificationContext: Send; /// The type of the equivocation proof used by the consensus engine. - type EquivocationProof: Send + Sync; + type EquivocationProof: Clone + Debug + Send + Sync; /// The equivocations finder. type EquivocationsFinder: FindEquivocations< Self::FinalityProof, @@ -62,7 +62,7 @@ pub trait Engine: Send { /// The type of the key owner proof used by the consensus engine. type KeyOwnerProof: Send; /// Type of bridge pallet initialization data. - type InitializationData: std::fmt::Debug + Send + Sync + 'static; + type InitializationData: Debug + Send + Sync + 'static; /// Type of bridge pallet operating mode. type OperatingMode: OperatingMode + 'static; diff --git a/relays/lib-substrate-relay/src/finality_base/mod.rs b/relays/lib-substrate-relay/src/finality_base/mod.rs index bcd3f008d67c9..825960b1b3ef2 100644 --- a/relays/lib-substrate-relay/src/finality_base/mod.rs +++ b/relays/lib-substrate-relay/src/finality_base/mod.rs @@ -20,7 +20,9 @@ pub mod engine; use crate::finality_base::engine::Engine; + use async_trait::async_trait; +use bp_runtime::{HashOf, HeaderIdOf}; use codec::Decode; use futures::{stream::unfold, Stream, StreamExt}; use relay_substrate_client::{Chain, Client, Error}; @@ -85,3 +87,21 @@ pub async fn finality_proofs( ) .boxed()) } + +/// Get the id of the best `SourceChain` header known to the `TargetChain` at the provided +/// target block using the exposed runtime API method. +/// +/// The runtime API method should be `FinalityApi::best_finalized()`. +pub async fn best_synced_header_id( + target_client: &Client, + at: HashOf, +) -> Result>, Error> +where + SourceChain: Chain, + TargetChain: Chain, +{ + // now let's read id of best finalized peer header at our best finalized block + target_client + .typed_state_call(SourceChain::BEST_FINALIZED_HEADER_ID_METHOD.into(), (), Some(at)) + .await +} diff --git a/relays/lib-substrate-relay/src/messages_source.rs b/relays/lib-substrate-relay/src/messages_source.rs index de795beb7870c..4c49d76bdf357 100644 --- a/relays/lib-substrate-relay/src/messages_source.rs +++ b/relays/lib-substrate-relay/src/messages_source.rs @@ -19,6 +19,7 @@ //! `` chain. use crate::{ + finality_base::best_synced_header_id, messages_lane::{ BatchProofTransaction, MessageLaneAdapter, ReceiveMessagesDeliveryProofCallBuilder, SubstrateMessageLane, @@ -428,11 +429,7 @@ where // now let's read id of best finalized peer header at our best finalized block let peer_on_self_best_finalized_id = - best_finalized_peer_header_at_self::( - self_client, - self_best_id.hash(), - ) - .await?; + best_synced_header_id::(self_client, self_best_id.hash()).await?; // read actual header, matching the `peer_on_self_best_finalized_id` from the peer chain let actual_peer_on_self_best_finalized_id = diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index 428ee33494ca4..f23357bfed709 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -20,10 +20,11 @@ pub use bp_runtime::HeaderId; pub use error::Error; pub use relay_loop::{relay_loop, relay_metrics}; pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto}; +use std::fmt::Debug; use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; -use futures::future::FutureExt; +use futures::future::{BoxFuture, FutureExt}; use std::time::Duration; use thiserror::Error; @@ -134,12 +135,16 @@ pub enum TrackedTransactionStatus { #[async_trait] pub trait TransactionTracker: Send { /// Header id, used by the chain. - type HeaderId: Clone + Send; + type HeaderId: Clone + Debug + Send; /// Wait until transaction is either finalized or invalidated/lost. async fn wait(self) -> TrackedTransactionStatus; } +/// Future associated with `TransactionTracker`, monitoring the transaction status. +pub type TrackedTransactionFuture = + BoxFuture<'static, TrackedTransactionStatus<::HeaderId>>; + /// Stringified error that may be either connection-related or not. #[derive(Error, Debug)] pub enum StringifiedMaybeConnectionError {