Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slot-based-collator: Refactor some internals #6935

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,32 @@ use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
OccupiedCoreAssumption,
};
use polkadot_primitives::Id as ParaId;

use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot};
use sp_core::{crypto::Pair, U256};
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, One};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use sp_timestamp::Timestamp;
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use super::CollatorMessage;
use crate::{
collator::{self as collator_util},
collators::{check_validation_code_or_log, cores_scheduled_for_para},
collators::{
check_validation_code_or_log,
slot_based::{
core_selector,
relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
},
},
LOG_TARGET,
};

Expand Down Expand Up @@ -218,7 +220,7 @@ where
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};

let mut relay_chain_fetcher = RelayChainCachingFetcher::new(relay_client.clone(), para_id);
let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);

loop {
// We wait here until the next slot arrives.
Expand All @@ -242,7 +244,7 @@ where

// Retrieve the core selector.
let (core_selector, claim_queue_offset) =
match core_selector(&*para_client, &parent).await {
match core_selector(&*para_client, parent.hash, *parent.header.number()) {
Ok(core_selector) => core_selector,
Err(err) => {
tracing::trace!(
Expand All @@ -259,7 +261,7 @@ where
max_pov_size,
scheduled_cores,
claimed_cores,
}) = relay_chain_fetcher
}) = relay_chain_data_cache
.get_mut_relay_chain_data(relay_parent, claim_queue_offset)
.await
else {
Expand Down Expand Up @@ -419,119 +421,3 @@ where
}
}
}

/// Contains relay chain data necessary for parachain block building.
#[derive(Clone)]
struct RelayChainData {
/// Current relay chain parent header.
pub relay_parent_header: RelayHeader,
/// The cores on which the para is scheduled at the configured claim queue offset.
pub scheduled_cores: Vec<CoreIndex>,
/// Maximum configured PoV size on the relay chain.
pub max_pov_size: u32,
/// The claimed cores at a relay parent.
pub claimed_cores: BTreeSet<CoreIndex>,
}

/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block
/// hash.
struct RelayChainCachingFetcher<RI> {
relay_client: RI,
para_id: ParaId,
last_data: Option<(RelayHash, RelayChainData)>,
}

impl<RI> RelayChainCachingFetcher<RI>
where
RI: RelayChainInterface + Clone + 'static,
{
pub fn new(relay_client: RI, para_id: ParaId) -> Self {
Self { relay_client, para_id, last_data: None }
}

/// Fetch required [`RelayChainData`] from the relay chain.
/// If this data has been fetched in the past for the incoming hash, it will reuse
/// cached data.
pub async fn get_mut_relay_chain_data(
&mut self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<&mut RelayChainData, ()> {
match &self.last_data {
Some((last_seen_hash, _)) if *last_seen_hash == relay_parent => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent.");
Ok(&mut self.last_data.as_mut().expect("last_data is Some").1)
},
_ => {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain.");
let data = self.update_for_relay_parent(relay_parent, claim_queue_offset).await?;
self.last_data = Some((relay_parent, data));
Ok(&mut self.last_data.as_mut().expect("last_data was just set above").1)
},
}
}

/// Fetch fresh data from the relay chain for the given relay parent hash.
async fn update_for_relay_parent(
&self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<RelayChainData, ()> {
let scheduled_cores = cores_scheduled_for_para(
relay_parent,
self.para_id,
&self.relay_client,
claim_queue_offset,
)
.await;

let Ok(Some(relay_parent_header)) =
self.relay_client.header(BlockId::Hash(relay_parent)).await
else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header.");
return Err(())
};

let max_pov_size = match self
.relay_client
.persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included)
.await
{
Ok(None) => return Err(()),
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
return Err(())
},
};

Ok(RelayChainData {
relay_parent_header,
scheduled_cores,
max_pov_size,
claimed_cores: BTreeSet::new(),
})
}
}

async fn core_selector<Block: BlockT, Client>(
para_client: &Client,
parent: &consensus_common::PotentialParent<Block>,
) -> Result<(CoreSelector, ClaimQueueOffset), sp_api::ApiError>
where
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client::Api: GetCoreSelectorApi<Block>,
{
let block_hash = parent.hash;
let runtime_api = para_client.runtime_api();

if runtime_api.has_api::<dyn GetCoreSelectorApi<Block>>(block_hash)? {
Ok(runtime_api.core_selector(block_hash)?)
} else {
let next_block_number: U256 = (*parent.header.number() + One::one()).into();

// If the runtime API does not support the core selector API, fallback to some default
// values.
Ok((CoreSelector(next_block_number.byte(0)), ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
35 changes: 30 additions & 5 deletions cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,32 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::GetCoreSelectorApi;
use cumulus_primitives_core::{ClaimQueueOffset, CoreSelector, GetCoreSelectorApi};
use cumulus_relay_chain_interface::RelayChainInterface;
use futures::FutureExt;
use polkadot_primitives::{
CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash,
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId,
ValidationCodeHash,
};
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sc_utils::mpsc::tracing_unbounded;
use sp_api::ProvideRuntimeApi;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::AuraApi;
use sp_core::{crypto::Pair, traits::SpawnNamed};
use sp_core::{crypto::Pair, traits::SpawnNamed, U256};
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Member};
use sp_runtime::traits::{Block as BlockT, Member, NumberFor, One};
use std::{sync::Arc, time::Duration};

pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};

mod block_builder_task;
mod block_import;
mod collation_task;
mod relay_chain_data_cache;

/// Parameters for [`run`].
pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
Expand Down Expand Up @@ -204,3 +206,26 @@ struct CollatorMessage<Block: BlockT> {
/// Core index that this block should be submitted on
pub core_index: CoreIndex,
}

/// Fetch the `CoreSelector` and `ClaimQueueOffset` for `parent_hash`.
fn core_selector<Block: BlockT, Client>(
para_client: &Client,
parent_hash: Block::Hash,
parent_number: NumberFor<Block>,
) -> Result<(CoreSelector, ClaimQueueOffset), sp_api::ApiError>
where
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client::Api: GetCoreSelectorApi<Block>,
{
let runtime_api = para_client.runtime_api();

if runtime_api.has_api::<dyn GetCoreSelectorApi<Block>>(parent_hash)? {
Ok(runtime_api.core_selector(parent_hash)?)
} else {
let next_block_number: U256 = (parent_number + One::one()).into();

// If the runtime API does not support the core selector API, fallback to some default
// values.
Ok((CoreSelector(next_block_number.byte(0)), ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

//! Utility for caching [`RelayChainData`] for different relay blocks.

use crate::collators::cores_scheduled_for_para;
use cumulus_primitives_core::ClaimQueueOffset;
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::{
CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, OccupiedCoreAssumption,
};
use sp_runtime::generic::BlockId;
use std::collections::BTreeSet;

/// Contains relay chain data necessary for parachain block building.
#[derive(Clone)]
pub struct RelayChainData {
/// Current relay chain parent header.
pub relay_parent_header: RelayHeader,
/// The cores on which the para is scheduled at the configured claim queue offset.
pub scheduled_cores: Vec<CoreIndex>,
/// Maximum configured PoV size on the relay chain.
pub max_pov_size: u32,
/// The claimed cores at a relay parent.
pub claimed_cores: BTreeSet<CoreIndex>,
}

/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block
/// hash.
pub struct RelayChainDataCache<RI> {
relay_client: RI,
para_id: ParaId,
cached_data: schnellru::LruMap<RelayHash, RelayChainData>,
}

impl<RI> RelayChainDataCache<RI>
where
RI: RelayChainInterface + Clone + 'static,
{
pub fn new(relay_client: RI, para_id: ParaId) -> Self {
Self {
relay_client,
para_id,
// 50 cached relay chain blocks should be more than enough.
cached_data: schnellru::LruMap::new(schnellru::ByLength::new(50)),
}
}

/// Fetch required [`RelayChainData`] from the relay chain.
/// If this data has been fetched in the past for the incoming hash, it will reuse
/// cached data.
pub async fn get_mut_relay_chain_data(
&mut self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<&mut RelayChainData, ()> {
let insert_data = if self.cached_data.peek(&relay_parent).is_some() {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent.");
None
} else {
tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain.");
Some(self.update_for_relay_parent(relay_parent, claim_queue_offset).await?)
};

Ok(self
.cached_data
.get_or_insert(relay_parent, || {
insert_data.expect("`insert_data` exists if not cached yet; qed")
})
.expect("There is space for at least one element; qed"))
}

/// Fetch fresh data from the relay chain for the given relay parent hash.
async fn update_for_relay_parent(
&self,
relay_parent: RelayHash,
claim_queue_offset: ClaimQueueOffset,
) -> Result<RelayChainData, ()> {
let scheduled_cores = cores_scheduled_for_para(
relay_parent,
self.para_id,
&self.relay_client,
claim_queue_offset,
)
.await;

let Ok(Some(relay_parent_header)) =
self.relay_client.header(BlockId::Hash(relay_parent)).await
else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header.");
return Err(())
};

let max_pov_size = match self
.relay_client
.persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included)
.await
{
Ok(None) => return Err(()),
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
return Err(())
},
};

Ok(RelayChainData {
relay_parent_header,
scheduled_cores,
max_pov_size,
claimed_cores: BTreeSet::new(),
})
}
}
Loading