Skip to content

Commit

Permalink
improve logic to collect indexer_shards to get shard_index from `…
Browse files Browse the repository at this point in the history
…shard_id` (#12579)

**Problem**
The current implementation incorrectly mixes `shard_index` values,
causing inconsistencies during the data collection process. This leads
to incorrect shard mapping and potential data inaccuracies.

**Solution**
This PR updates the logic to correctly collect `indexer_shards` and get
the `shard_index` directly from the `shard_id`. The updated approach
ensures accurate shard indexing and resolves the problem of mixed
`shard_index` values.
  • Loading branch information
kobayurii authored Dec 9, 2024
1 parent 2148dca commit 949d699
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn build_streamer_message(
})
.collect::<Vec<_>>();

for (shard_index, chunk) in chunks.into_iter().enumerate() {
for chunk in chunks.into_iter() {
let views::ChunkView { transactions, author, header, receipts: chunk_non_local_receipts } =
chunk;

Expand Down Expand Up @@ -232,7 +232,12 @@ pub async fn build_streamer_message(
}

chunk_receipts.extend(chunk_non_local_receipts);

// Find the shard index for the chunk by shard_id
let shard_index = protocol_config_view
.shard_layout
.get_shard_index(header.shard_id)
.map_err(|e| FailedToFetchData::String(e.to_string()))?;
// Add receipt_execution_outcomes into corresponding indexer shard
indexer_shards[shard_index].receipt_execution_outcomes = receipt_execution_outcomes;
// Put the chunk into corresponding indexer shard
indexer_shards[shard_index].chunk = Some(IndexerChunkView {
Expand All @@ -250,7 +255,7 @@ pub async fn build_streamer_message(
let shard_index = protocol_config_view
.shard_layout
.get_shard_index(shard_id)
.expect("Failed to get shard index");
.map_err(|e| FailedToFetchData::String(e.to_string()))?;
indexer_shards[shard_index].receipt_execution_outcomes.extend(outcomes.into_iter().map(
|outcome| IndexerExecutionOutcomeWithReceipt {
execution_outcome: outcome.execution_outcome,
Expand Down

0 comments on commit 949d699

Please sign in to comment.