diff --git a/relays/messages/src/message_lane_loop.rs b/relays/messages/src/message_lane_loop.rs index 5e5085bbd5..2903435dda 100644 --- a/relays/messages/src/message_lane_loop.rs +++ b/relays/messages/src/message_lane_loop.rs @@ -1017,58 +1017,6 @@ pub(crate) mod tests { assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],); } - #[test] - fn message_lane_loop_is_able_to_recover_from_race_stall() { - // with this configuration, both source and target clients will lose their transactions => - // reconnect will happen - let (source_exit_sender, exit_receiver) = unbounded(); - let target_exit_sender = source_exit_sender.clone(); - let result = run_loop_test( - Arc::new(Mutex::new(TestClientData { - source_state: ClientState { - best_self: HeaderId(0, 0), - best_finalized_self: HeaderId(0, 0), - best_finalized_peer_at_best_self: Some(HeaderId(0, 0)), - actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)), - }, - source_latest_generated_nonce: 1, - source_tracked_transaction_status: TrackedTransactionStatus::Lost, - target_state: ClientState { - best_self: HeaderId(0, 0), - best_finalized_self: HeaderId(0, 0), - best_finalized_peer_at_best_self: Some(HeaderId(0, 0)), - actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)), - }, - target_latest_received_nonce: 0, - target_tracked_transaction_status: TrackedTransactionStatus::Lost, - ..Default::default() - })), - Arc::new(move |data: &mut TestClientData| { - if data.is_source_reconnected { - data.source_tracked_transaction_status = - TrackedTransactionStatus::Finalized(Default::default()); - } - if data.is_source_reconnected && data.is_target_reconnected { - source_exit_sender.unbounded_send(()).unwrap(); - } - }), - Arc::new(|_| {}), - Arc::new(move |data: &mut TestClientData| { - if data.is_target_reconnected { - data.target_tracked_transaction_status = - TrackedTransactionStatus::Finalized(Default::default()); - } - if data.is_source_reconnected && data.is_target_reconnected { - target_exit_sender.unbounded_send(()).unwrap(); - } - }), - Arc::new(|_| {}), - exit_receiver.into_future().map(|(_, _)| ()), - ); - - assert!(result.is_source_reconnected); - } - #[test] fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() { // with this configuration, both source and target clients will mine their transactions, but @@ -1146,7 +1094,6 @@ pub(crate) mod tests { exit_receiver.into_future().map(|(_, _)| ()), ); - assert!(result.is_source_reconnected); assert_eq!(result.submitted_messages_proofs.len(), 2); assert_eq!(result.submitted_messages_receiving_proofs.len(), 2); } diff --git a/relays/messages/src/message_race_delivery.rs b/relays/messages/src/message_race_delivery.rs index c704a7b561..8b8e690ec4 100644 --- a/relays/messages/src/message_race_delivery.rs +++ b/relays/messages/src/message_race_delivery.rs @@ -292,11 +292,16 @@ impl std::fmt::Debug for MessageDeliveryStrategy MessageDeliveryStrategy { /// Returns total weight of all undelivered messages. - fn total_queued_dispatch_weight(&self) -> Weight { + fn dispatch_weight_for_range(&self, range: &RangeInclusive) -> Weight { self.strategy .source_queue() .iter() - .flat_map(|(_, range)| range.values().map(|details| details.dispatch_weight)) + .flat_map(|(_, subrange)| { + subrange + .iter() + .filter(|(nonce, _)| range.contains(nonce)) + .map(|(_, details)| details.dispatch_weight) + }) .fold(Weight::zero(), |total, weight| total.saturating_add(weight)) } } @@ -424,7 +429,7 @@ where } async fn select_nonces_to_deliver( - &mut self, + &self, race_state: RaceState, TargetHeaderIdOf

, P::MessagesProof>, ) -> Option<(RangeInclusive, Self::ProofParameters)> { let best_finalized_source_header_id_at_best_target = @@ -526,7 +531,6 @@ where let maximal_source_queue_index = self.strategy.maximal_available_source_queue_index(race_state)?; - let previous_total_dispatch_weight = self.total_queued_dispatch_weight(); let source_queue = self.strategy.source_queue(); let reference = RelayMessagesBatchReference { @@ -544,10 +548,7 @@ where let range_begin = source_queue[0].1.begin(); let selected_nonces = range_begin..=range_end; - self.strategy.remove_le_nonces_from_source_queue(range_end); - - let new_total_dispatch_weight = self.total_queued_dispatch_weight(); - let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight; + let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces); Some(( selected_nonces, @@ -707,7 +708,7 @@ mod tests { #[async_std::test] async fn message_delivery_strategy_selects_messages_to_deliver() { - let (state, mut strategy) = prepare_strategy(); + let (state, strategy) = prepare_strategy(); // both sides are ready to relay new messages assert_eq!( diff --git a/relays/messages/src/message_race_loop.rs b/relays/messages/src/message_race_loop.rs index 3d995a0a36..50f71ea050 100644 --- a/relays/messages/src/message_race_loop.rs +++ b/relays/messages/src/message_race_loop.rs @@ -211,7 +211,7 @@ pub trait RaceStrategy: Debug { /// data) from source to target node. /// Additionally, parameters required to generate proof are returned. async fn select_nonces_to_deliver( - &mut self, + &self, race_state: RaceState, ) -> Option<(RangeInclusive, Self::ProofParameters)>; } @@ -234,6 +234,13 @@ pub struct RaceState { pub nonces_submitted: Option>, } +impl RaceState { + /// Reset `nonces_submitted` to `None`. + fn reset_submitted(&mut self) { + self.nonces_submitted = None; + } +} + /// Run race loop until connection with target or source node is lost. pub async fn run, TC: TargetClient

>( race_source: SC, @@ -460,7 +467,7 @@ pub async fn run, TC: TargetClient

>( (TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => { // our transaction has been mined, but was it successful or not? let's check the best // nonce at the target node. - race_target.nonces(at_block, false) + let _ = race_target.nonces(at_block, false) .await .map_err(|e| format!("failed to read nonces from target node: {e:?}")) .and_then(|(_, nonces_at_target)| { @@ -477,26 +484,26 @@ pub async fn run, TC: TargetClient

>( .map_err(|e| { log::error!( target: "bridge", - "{} -> {} race has stalled. Transaction failed: {}. Going to restart", + "{} -> {} race transaction failed: {}", P::source_name(), P::target_name(), e, ); - FailedClient::Both - })?; + race_state.reset_submitted(); + }); }, (TrackedTransactionStatus::Lost, _) => { log::warn!( target: "bridge", - "{} -> {} race has stalled. State: {:?}. Strategy: {:?}", + "{} -> {} race transaction has been lost. State: {:?}. Strategy: {:?}", P::source_name(), P::target_name(), race_state, strategy, ); - return Err(FailedClient::Both); + race_state.reset_submitted(); }, _ => (), } @@ -531,8 +538,7 @@ pub async fn run, TC: TargetClient

>( race_state.clone() }; - let nonces_to_deliver = - select_nonces_to_deliver(expected_race_state, &mut strategy).await; + let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await; let best_at_source = strategy.best_at_source(); if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { @@ -665,7 +671,7 @@ where async fn select_nonces_to_deliver( race_state: RaceState, - strategy: &mut Strategy, + strategy: &Strategy, ) -> Option<(SourceHeaderId, RangeInclusive, Strategy::ProofParameters)> where SourceHeaderId: Clone, @@ -723,7 +729,7 @@ mod tests { // the proof will be generated on source, but using BEST_AT_TARGET block assert_eq!( - select_nonces_to_deliver(race_state, &mut strategy).await, + select_nonces_to_deliver(race_state, &strategy).await, Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),)) ); } diff --git a/relays/messages/src/message_race_strategy.rs b/relays/messages/src/message_race_strategy.rs index 9b9091b979..9a53a487d9 100644 --- a/relays/messages/src/message_race_strategy.rs +++ b/relays/messages/src/message_race_strategy.rs @@ -136,7 +136,7 @@ impl< } /// Remove all nonces that are less than or equal to given nonce from the source queue. - pub fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) { + fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) { while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { if let Some(range_to_requeue) = queued_range.greater_than(nonce) { self.source_queue.push_front((queued_at, range_to_requeue)); @@ -168,12 +168,12 @@ impl< SourceNoncesRange, Proof, > where - SourceHeaderHash: Clone + Debug + Send, - SourceHeaderNumber: Clone + Ord + Debug + Send, - SourceNoncesRange: NoncesRange + Debug + Send, - TargetHeaderHash: Debug + Send, - TargetHeaderNumber: Debug + Send, - Proof: Debug + Send, + SourceHeaderHash: Clone + Debug + Send + Sync, + SourceHeaderNumber: Clone + Ord + Debug + Send + Sync, + SourceNoncesRange: NoncesRange + Debug + Send + Sync, + TargetHeaderHash: Debug + Send + Sync, + TargetHeaderNumber: Debug + Send + Sync, + Proof: Debug + Send + Sync, { type SourceNoncesRange = SourceNoncesRange; type ProofParameters = (); @@ -284,6 +284,7 @@ impl< Proof, >, ) { + self.remove_le_nonces_from_source_queue(nonces.latest_nonce); // TODO: does it means that we'll try to submit old nonces in next tx??? self.best_target_nonce = Some(std::cmp::max( self.best_target_nonce.unwrap_or(nonces.latest_nonce), nonces.latest_nonce, @@ -291,7 +292,7 @@ impl< } async fn select_nonces_to_deliver( - &mut self, + &self, race_state: RaceState< HeaderId, HeaderId, @@ -301,7 +302,6 @@ impl< let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?; let range_begin = self.source_queue[0].1.begin(); let range_end = self.source_queue[maximal_source_queue_index].1.end(); - self.remove_le_nonces_from_source_queue(range_end); Some((range_begin..=range_end, ())) } }