Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make indexer support single shard tracking. #12607

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ nearcore.workspace = true
near-client.workspace = true
near-chain-configs.workspace = true
near-config-utils.workspace = true
near-dyn-configs.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-indexer-primitives.workspace = true
near-o11y.workspace = true
near-parameters.workspace = true
Expand All @@ -39,6 +40,7 @@ nightly_protocol = [
"near-chain-configs/nightly_protocol",
"near-client/nightly_protocol",
"near-dyn-configs/nightly_protocol",
"near-epoch-manager/nightly_protocol",
"near-indexer-primitives/nightly_protocol",
"near-o11y/nightly_protocol",
"near-parameters/nightly_protocol",
Expand All @@ -52,6 +54,7 @@ nightly = [
"near-chain-configs/nightly",
"near-client/nightly",
"near-dyn-configs/nightly",
"near-epoch-manager/nightly",
"near-indexer-primitives/nightly",
"near-o11y/nightly",
"near-parameters/nightly",
Expand Down
15 changes: 9 additions & 6 deletions chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use near_indexer_primitives::{
StreamerMessage,
};

use near_epoch_manager::shard_tracker::ShardTracker;
pub use streamer::build_streamer_message;

mod streamer;
Expand Down Expand Up @@ -92,6 +93,7 @@ pub struct Indexer {
near_config: nearcore::NearConfig,
view_client: actix::Addr<near_client::ViewClientActor>,
client: actix::Addr<near_client::ClientActor>,
shard_tracker: ShardTracker,
}

impl Indexer {
Expand All @@ -113,16 +115,16 @@ impl Indexer {
.unwrap_or_else(|e| panic!("Error loading config: {:#}", e));

assert!(
!&near_config.client_config.tracked_shards.is_empty(),
"Indexer should track at least one shard. \n\
Tip: You may want to update {} with `\"tracked_shards\": [0]`
",
!&near_config.client_config.tracked_shards.is_empty() || !&near_config.client_config.tracked_accounts.is_empty(),
"Indexer should either track at least one shard or track at least one account. \n\
Tip: You may want to update {} with `\"tracked_shards\": [0]` (which tracks all shards)
or `\"tracked_accounts\": [\"some_account.near\"]` (which tracks whatever shard the account is on)",
indexer_config.home_dir.join("config.json").display()
);
let nearcore::NearNode { client, view_client, .. } =
let nearcore::NearNode { client, view_client, shard_tracker, .. } =
nearcore::start_with_config(&indexer_config.home_dir, near_config.clone())
.with_context(|| "start_with_config")?;
Ok(Self { view_client, client, near_config, indexer_config })
Ok(Self { view_client, client, near_config, indexer_config, shard_tracker })
}

/// Boots up `near_indexer::streamer`, so it monitors the new blocks with chunks, transactions, receipts, and execution outcomes inside. The returned stream handler should be drained and handled on the user side.
Expand All @@ -131,6 +133,7 @@ impl Indexer {
actix::spawn(streamer::start(
self.view_client.clone(),
self.client.clone(),
self.shard_tracker.clone(),
self.indexer_config.clone(),
self.near_config.config.store.clone(),
sender,
Expand Down
7 changes: 6 additions & 1 deletion chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_primitives::{types, views};

use super::errors::FailedToFetchData;
use super::INDEXER;
use near_epoch_manager::shard_tracker::ShardTracker;

pub(crate) async fn fetch_status(
client: &Addr<near_client::ClientActor>,
Expand Down Expand Up @@ -161,12 +162,16 @@ async fn fetch_single_chunk(
pub(crate) async fn fetch_block_chunks(
client: &Addr<near_client::ViewClientActor>,
block: &views::BlockView,
shard_tracker: &ShardTracker,
) -> Result<Vec<views::ChunkView>, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching chunks for block #{}", block.header.height);
let mut futures: futures::stream::FuturesUnordered<_> = block
.chunks
.iter()
.filter(|chunk| chunk.height_included == block.header.height)
.filter(|chunk| {
shard_tracker.care_about_shard(None, &block.header.prev_hash, chunk.shard_id, false)
&& chunk.height_included == block.header.height
})
.map(|chunk| fetch_single_chunk(&client, chunk.chunk_hash))
.collect();
let mut chunks = Vec::<views::ChunkView>::with_capacity(futures.len());
Expand Down
23 changes: 17 additions & 6 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use self::utils::convert_transactions_sir_into_local_receipts;
use crate::streamer::fetchers::fetch_protocol_config;
use crate::INDEXER;
use crate::{AwaitForNodeSyncedEnum, IndexerConfig};
use near_epoch_manager::shard_tracker::ShardTracker;

mod errors;
mod fetchers;
Expand Down Expand Up @@ -74,9 +75,10 @@ fn test_problematic_blocks_hash() {
pub async fn build_streamer_message(
client: &Addr<near_client::ViewClientActor>,
block: views::BlockView,
shard_tracker: &ShardTracker,
) -> Result<StreamerMessage, FailedToFetchData> {
let _timer = metrics::BUILD_STREAMER_MESSAGE_TIME.start_timer();
let chunks = fetch_block_chunks(&client, &block).await?;
let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?;

let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;
let shard_ids = protocol_config_view.shard_layout.shard_ids();
Expand Down Expand Up @@ -200,6 +202,7 @@ pub async fn build_streamer_message(
&runtime_config,
block.clone(),
execution_outcome.id,
shard_tracker,
)
.await?
}
Expand Down Expand Up @@ -276,6 +279,7 @@ async fn lookup_delayed_local_receipt_in_previous_blocks(
runtime_config: &RuntimeConfig,
block: views::BlockView,
receipt_id: CryptoHash,
shard_tracker: &ShardTracker,
) -> Result<views::ReceiptView, FailedToFetchData> {
let mut prev_block_tried = 0u16;
let mut prev_block_hash = block.header.prev_hash;
Expand All @@ -299,9 +303,14 @@ async fn lookup_delayed_local_receipt_in_previous_blocks(

prev_block_hash = prev_block.header.prev_hash;

if let Some(receipt) =
find_local_receipt_by_id_in_block(&client, &runtime_config, prev_block, receipt_id)
.await?
if let Some(receipt) = find_local_receipt_by_id_in_block(
&client,
&runtime_config,
prev_block,
receipt_id,
shard_tracker,
)
.await?
{
tracing::debug!(
target: INDEXER,
Expand All @@ -324,8 +333,9 @@ async fn find_local_receipt_by_id_in_block(
runtime_config: &RuntimeConfig,
block: views::BlockView,
receipt_id: near_primitives::hash::CryptoHash,
shard_tracker: &ShardTracker,
) -> Result<Option<views::ReceiptView>, FailedToFetchData> {
let chunks = fetch_block_chunks(&client, &block).await?;
let chunks = fetch_block_chunks(&client, &block, shard_tracker).await?;

let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;
let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?;
Expand Down Expand Up @@ -371,6 +381,7 @@ async fn find_local_receipt_by_id_in_block(
pub(crate) async fn start(
view_client: Addr<near_client::ViewClientActor>,
client: Addr<near_client::ClientActor>,
shard_tracker: ShardTracker,
indexer_config: IndexerConfig,
store_config: near_store::StoreConfig,
blocks_sink: mpsc::Sender<StreamerMessage>,
Expand Down Expand Up @@ -437,7 +448,7 @@ pub(crate) async fn start(
for block_height in start_syncing_block_height..=latest_block_height {
metrics::CURRENT_BLOCK_HEIGHT.set(block_height as i64);
if let Ok(block) = fetch_block_by_height(&view_client, block_height).await {
let response = build_streamer_message(&view_client, block).await;
let response = build_streamer_message(&view_client, block, &shard_tracker).await;

match response {
Ok(streamer_message) => {
Expand Down
5 changes: 4 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ pub struct NearNode {
pub resharding_handle: ReshardingHandle,
// The threads that state sync runs in.
pub state_sync_runtime: Arc<tokio::runtime::Runtime>,
/// Shard tracker, allows querying of which shards are tracked by this node.
pub shard_tracker: ShardTracker,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -428,7 +430,7 @@ pub fn start_with_config_and_synchronization(
client_config: config.client_config.clone(),
chain_genesis,
epoch_manager,
shard_tracker,
shard_tracker: shard_tracker.clone(),
runtime,
validator: config.validator_signer.clone(),
dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(),
Expand Down Expand Up @@ -511,5 +513,6 @@ pub fn start_with_config_and_synchronization(
state_sync_dumper,
resharding_handle,
state_sync_runtime,
shard_tracker,
})
}
Loading