diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 4484307ecf45e0..65b3ed36531048 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -202,9 +202,7 @@ impl Scheduler for GreedyScheduler { ); // Push unschedulables back into the queue - for id in self.unschedulables.drain(..) { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(self.unschedulables.drain(..)); Ok(SchedulingSummary { num_scheduled, diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 54078166ebd1c8..9d5b3a08d1189c 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -298,14 +298,13 @@ impl Scheduler for PrioGraphScheduler { saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); // Push unschedulable ids back into the container - for id in unschedulable_ids { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(unschedulable_ids.into_iter()); // Push remaining transactions back into the container - while let Some((id, _)) = self.prio_graph.pop_and_unblock() { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(std::iter::from_fn(|| { + self.prio_graph.pop_and_unblock().map(|(id, _)| id) + })); + // No more remaining items in the queue. // Clear here to make sure the next scheduling pass starts fresh // without detecting any conflicts. diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index aa59c97b33f90b..6aef38c473a5e0 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -1,9 +1,11 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, + transaction_priority_id::TransactionPriorityId, transaction_state::TransactionState, transaction_state_container::{ SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer, + EXTRA_CAPACITY, }, }, crate::banking_stage::{ @@ -406,8 +408,69 @@ impl TransactionViewReceiveAndBuffer { let mut num_received = 0usize; let mut num_buffered = 0usize; + let mut num_dropped_on_status_age_checks = 0usize; let mut num_dropped_on_capacity = 0usize; let mut num_dropped_on_receive = 0usize; + + // Create temporary batches of transactions to be age-checked. + let mut transaction_priority_ids = ArrayVec::<_, EXTRA_CAPACITY>::new(); + let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); + let mut error_counters = TransactionErrorMetrics::default(); + + let mut check_and_push_to_queue = + |container: &mut TransactionViewStateContainer, + transaction_priority_ids: &mut ArrayVec| { + // Temporary scope so that transaction references are immediately + // dropped and transactions not passing + let mut check_results = { + let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); + transactions.extend(transaction_priority_ids.iter().map(|priority_id| { + &container + .get_transaction_ttl(priority_id.id) + .expect("transaction must exist") + .transaction + })); + working_bank.check_transactions::>( + &transactions, + &lock_results[..transactions.len()], + MAX_PROCESSING_AGE, + &mut error_counters, + ) + }; + + // Remove errored transactions + for (result, priority_id) in check_results + .iter_mut() + .zip(transaction_priority_ids.iter()) + { + if result.is_err() { + num_dropped_on_status_age_checks += 1; + container.remove_by_id(priority_id.id); + } + let transaction = &container + .get_transaction_ttl(priority_id.id) + .expect("transaction must exist") + .transaction; + if let Err(err) = Consumer::check_fee_payer_unlocked( + working_bank, + transaction, + &mut error_counters, + ) { + *result = Err(err); + num_dropped_on_status_age_checks += 1; + container.remove_by_id(priority_id.id); + } + } + // Push non-errored transaction into queue. + num_dropped_on_capacity += container.push_ids_into_queue( + check_results + .into_iter() + .zip(transaction_priority_ids.drain(..)) + .filter(|(r, _)| r.is_ok()) + .map(|(_, id)| id), + ); + }; + for packet_batch in packet_batch_message.iter() { for packet in packet_batch.iter() { let Some(packet_data) = packet.data(..) else { @@ -417,31 +480,45 @@ impl TransactionViewReceiveAndBuffer { num_received += 1; // Reserve free-space to copy packet into, run sanitization checks, and insert. - if container.try_insert_with_data( - packet_data, - |bytes| match Self::try_handle_packet( - bytes, - root_bank, - working_bank, - alt_resolved_slot, - sanitized_epoch, - transaction_account_lock_limit, - ) { - Ok(state) => { - num_buffered += 1; - Ok(state) + if let Some(transaction_id) = + container.try_insert_map_only_with_data(packet_data, |bytes| { + match Self::try_handle_packet( + bytes, + root_bank, + working_bank, + alt_resolved_slot, + sanitized_epoch, + transaction_account_lock_limit, + ) { + Ok(state) => { + num_buffered += 1; + Ok(state) + } + Err(()) => { + num_dropped_on_receive += 1; + Err(()) + } } - Err(()) => { - num_dropped_on_receive += 1; - Err(()) - } - }, - ) { - num_dropped_on_capacity += 1; - }; + }) + { + let priority = container + .get_mut_transaction_state(transaction_id) + .expect("transaction must exist") + .priority(); + transaction_priority_ids + .push(TransactionPriorityId::new(priority, transaction_id)); + + // If at capacity, run checks and remove invalid transactions. + if transaction_priority_ids.len() == EXTRA_CAPACITY { + check_and_push_to_queue(container, &mut transaction_priority_ids); + } + } } } + // Any remaining packets undergo status/age checks + check_and_push_to_queue(container, &mut transaction_priority_ids); + let buffer_time_us = start.elapsed().as_micros() as u64; timing_metrics.update(|timing_metrics| { saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); @@ -449,6 +526,10 @@ impl TransactionViewReceiveAndBuffer { count_metrics.update(|count_metrics| { saturating_add_assign!(count_metrics.num_received, num_received); saturating_add_assign!(count_metrics.num_buffered, num_buffered); + saturating_add_assign!( + count_metrics.num_dropped_on_age_and_status, + num_dropped_on_status_age_checks + ); saturating_add_assign!( count_metrics.num_dropped_on_capacity, num_dropped_on_capacity diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 3394a2f9cc71e2..d7e7061bb0c246 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -327,9 +327,8 @@ where } if hold { - for priority_id in ids_to_add_back { - self.container.push_id_into_queue(priority_id); - } + self.container + .push_ids_into_queue(ids_to_add_back.into_iter()); } else { for priority_id in ids_to_add_back { self.container.remove_by_id(priority_id.id); @@ -393,14 +392,22 @@ where &mut error_counters, ); - for (result, id) in check_results.into_iter().zip(chunk.iter()) { + // Remove errored transactions + for (result, id) in check_results.iter().zip(chunk.iter()) { if result.is_err() { saturating_add_assign!(num_dropped_on_age_and_status, 1); self.container.remove_by_id(id.id); - } else { - self.container.push_id_into_queue(*id); } } + + // Push non-errored transaction into queue. + self.container.push_ids_into_queue( + check_results + .into_iter() + .zip(chunk.iter()) + .filter(|(r, _)| r.is_ok()) + .map(|(_, id)| *id), + ); } self.count_metrics.update(|count_metrics| { diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index c9c8ddbde751e5..ceca4b5a7e26a8 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -44,6 +44,7 @@ use { /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. pub(crate) struct TransactionStateContainer { + capacity: usize, priority_queue: MinMaxHeap, id_to_transaction_state: Slab>, } @@ -55,9 +56,6 @@ pub(crate) trait StateContainer { /// Returns true if the queue is empty. fn is_empty(&self) -> bool; - /// Returns the remaining capacity of the container - fn remaining_capacity(&self) -> usize; - /// Get the top transaction id in the priority queue. fn pop(&mut self) -> Option; @@ -81,13 +79,19 @@ pub(crate) trait StateContainer { .expect("transaction must exist"); let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); transaction_state.transition_to_unprocessed(transaction_ttl); - self.push_id_into_queue(priority_id); + self.push_ids_into_queue(std::iter::once(priority_id)); } - /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority - /// transaction will be dropped (removed from the queue and map). - /// Returns `true` if a packet was dropped due to capacity limits. - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool; + /// Pushes transaction ids into the priority queue. If the queue if full, + /// the lowest priority transactions will be dropped (removed from the + /// queue and map) **after** all ids have been pushed. + /// To avoid allocating, the caller should not push more than + /// [`EXTRA_CAPACITY`] ids in a call. + /// Returns the number of dropped transactions. + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize; /// Remove transaction by id. fn remove_by_id(&mut self, id: TransactionId); @@ -95,13 +99,15 @@ pub(crate) trait StateContainer { fn get_min_max_priority(&self) -> MinMaxResult; } +// Extra capacity is added because some additional space is needed when +// pushing a new transaction into the container to avoid reallocation. +pub(crate) const EXTRA_CAPACITY: usize = 64; + impl StateContainer for TransactionStateContainer { fn with_capacity(capacity: usize) -> Self { - // Extra capacity is added because some additional space is needed when - // pushing a new transaction into the container to avoid reallocation. - const EXTRA_CAPACITY: usize = 64; Self { - priority_queue: MinMaxHeap::with_capacity(capacity), + capacity, + priority_queue: MinMaxHeap::with_capacity(capacity + EXTRA_CAPACITY), id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY), } } @@ -110,12 +116,6 @@ impl StateContainer for TransactionStateContainer usize { - self.priority_queue - .capacity() - .saturating_sub(self.id_to_transaction_state.len()) - } - fn pop(&mut self) -> Option { self.priority_queue.pop_max() } @@ -133,8 +133,29 @@ impl StateContainer for TransactionStateContainer bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize { + for id in priority_ids { + self.priority_queue.push(id); + } + + // The number of items in the `id_to_transaction_state` map is + // greater than or equal to the number of elements in the queue. + // To avoid the map going over capacity, we use the length of the + // map here instead of the queue. + let num_dropped = self + .id_to_transaction_state + .len() + .saturating_sub(self.capacity); + + for _ in 0..num_dropped { + let priority_id = self.priority_queue.pop_min().expect("queue is not empty"); + self.id_to_transaction_state.remove(priority_id.id); + } + + num_dropped } fn remove_by_id(&mut self, id: TransactionId) { @@ -162,9 +183,6 @@ impl TransactionStateContainer { priority: u64, cost: u64, ) -> bool { - // cache the remaining capacity **before** we take ownership of - // the next vacant entry. i.e. get the size before we insert. - let remaining_capacity = self.remaining_capacity(); let priority_id = { let entry = self.get_vacant_map_entry(); let transaction_id = entry.key(); @@ -177,22 +195,7 @@ impl TransactionStateContainer { TransactionPriorityId::new(priority, transaction_id) }; - self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - - fn push_id_into_queue_with_remaining_capacity( - &mut self, - priority_id: TransactionPriorityId, - remaining_capacity: usize, - ) -> bool { - if remaining_capacity == 0 { - let popped_id = self.priority_queue.push_pop_min(priority_id); - self.remove_by_id(popped_id.id); - true - } else { - self.priority_queue.push(priority_id); - false - } + self.push_ids_into_queue(std::iter::once(priority_id)) > 0 } fn get_vacant_map_entry(&mut self) -> VacantEntry> { @@ -214,15 +217,13 @@ pub struct TransactionViewStateContainer { } impl TransactionViewStateContainer { - /// Returns true if packet was dropped due to capacity limits. - pub(crate) fn try_insert_with_data( + /// Insert into the map, but NOT into the priority queue. + /// Returns the id of the transaction if it was inserted. + pub(crate) fn try_insert_map_only_with_data( &mut self, data: &[u8], f: impl FnOnce(SharedBytes) -> Result, ()>, - ) -> bool { - // Get remaining capacity before inserting. - let remaining_capacity = self.remaining_capacity(); - + ) -> Option { // Get a vacant entry in the slab. let vacant_entry = self.inner.get_vacant_map_entry(); let transaction_id = vacant_entry.key(); @@ -248,16 +249,11 @@ impl TransactionViewStateContainer { } // Attempt to insert the transaction. - match f(Arc::clone(bytes_entry)) { - Ok(state) => { - let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); - vacant_entry.insert(state); - - // Push the transaction into the queue. - self.inner - .push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - Err(_) => false, + if let Ok(state) = f(Arc::clone(bytes_entry)) { + vacant_entry.insert(state); + Some(transaction_id) + } else { + None } } } @@ -280,11 +276,6 @@ impl StateContainer for TransactionViewStateContainer { self.inner.is_empty() } - #[inline] - fn remaining_capacity(&self) -> usize { - self.inner.remaining_capacity() - } - #[inline] fn pop(&mut self) -> Option { self.inner.pop() @@ -307,8 +298,11 @@ impl StateContainer for TransactionViewStateContainer { } #[inline] - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.inner.push_id_into_queue(priority_id) + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize { + self.inner.push_ids_into_queue(priority_ids) } #[inline] @@ -327,6 +321,7 @@ mod tests { use { super::*, crate::banking_stage::scheduler_messages::MaxAge, + agave_transaction_view::transaction_view::SanitizedTransactionView, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -336,8 +331,9 @@ mod tests { signature::Keypair, signer::Signer, system_instruction, - transaction::{SanitizedTransaction, Transaction}, + transaction::{MessageHash, SanitizedTransaction, Transaction}, }, + std::collections::HashSet, }; /// Returns (transaction_ttl, priority, cost) @@ -424,4 +420,85 @@ mod tests { .get_mut_transaction_state(non_existing_id) .is_none()); } + + #[test] + fn test_view_push_ids_to_queue() { + let mut container = TransactionViewStateContainer::with_capacity(2); + + let reserved_addresses = HashSet::default(); + let packet_parser = |data, priority, cost| { + let view = SanitizedTransactionView::try_new_sanitized(data).unwrap(); + let view = RuntimeTransaction::>::try_from( + view, + MessageHash::Compute, + None, + ) + .unwrap(); + let view = RuntimeTransaction::>::try_from( + view, + None, + &reserved_addresses, + ) + .unwrap(); + + Ok(TransactionState::new( + SanitizedTransactionTTL { + transaction: view, + max_age: MaxAge::MAX, + }, + None, + priority, + cost, + )) + }; + + // Push 2 transactions into the queue so buffer is full. + for priority in [4, 5] { + let (_transaction_ttl, packet, priority, cost) = test_transaction(priority); + let id = container + .try_insert_map_only_with_data(packet.original_packet().data(..).unwrap(), |data| { + packet_parser(data, priority, cost) + }) + .unwrap(); + let priority_id = TransactionPriorityId::new(priority, id); + assert_eq!( + container.push_ids_into_queue(std::iter::once(priority_id)), + 0 + ); + } + + // Push 5 additional packets in. 5 should be dropped. + let mut priority_ids = Vec::with_capacity(5); + for priority in [10, 11, 12, 1, 2] { + let (_transaction_ttl, packet, priority, cost) = test_transaction(priority); + let id = container + .try_insert_map_only_with_data(packet.original_packet().data(..).unwrap(), |data| { + packet_parser(data, priority, cost) + }) + .unwrap(); + let priority_id = TransactionPriorityId::new(priority, id); + priority_ids.push(priority_id); + } + assert_eq!(container.push_ids_into_queue(priority_ids.into_iter()), 5); + assert_eq!(container.pop().unwrap().priority, 12); + assert_eq!(container.pop().unwrap().priority, 11); + assert!(container.pop().is_none()); + + // Container now has no items in the queue, but still has 5 items in the map. + // If we attempt to push additional transactions to the queue, they + // are rejected regardless of their priority. + let priority = u64::MAX; + let (_transaction_ttl, packet, priority, cost) = test_transaction(priority); + let id = container + .try_insert_map_only_with_data(packet.original_packet().data(..).unwrap(), |data| { + packet_parser(data, priority, cost) + }) + .unwrap(); + let priority_id = TransactionPriorityId::new(priority, id); + assert_eq!( + container.push_ids_into_queue(std::iter::once(priority_id)), + 1 + ); + assert!(container.pop().is_none()); + } }