Skip to content

Commit

Permalink
Feat(near-client): Chunk distribution via message bus (near#10480)
Browse files Browse the repository at this point in the history
This PR implements the changes to nearcore proposed in
near#10083

To summarize briefly here, the goal of this project is to reduce the
latency experienced by RPC nodes through directly distributing chunks
over a message bus. Validators eagerly push the chunks they produce on
to this bus in addition to the peer-to-peer messages they send. When any
node (validator or RPC) realizes it needs a chunk (e.g. because it is
present in a new block) then it can check the message bus to see if it
present there before trying a request over the peer-to-peer network.

Participation in chunk distribution via the message bus is entirely
optional and disabled by default. This PR has no impact on nodes that
are not participating in the new chunk distribution. To be clear: this
PR is not a replacement for the existing chunk distribution logic via
the peer-to-peer network; it is a secondary channel which should provide
faster chunk distribution (on the happy path).

If invalid chunks are published to the message bus then nodes which
receive them will request via the peer-to-peer network instead.

The details of how the chunks are posted to the message bus and what
service exists for querying from the message bus are left intentionally
abstract. These will be handled by separate a application (or
applications). The nearcore code only sends HTTP requests to endpoints
specified in config that is used by node operators to opt-in to this
feature.
  • Loading branch information
birchmd committed Mar 12, 2024
1 parent 78122a6 commit 5d65f81
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 36 deletions.
18 changes: 17 additions & 1 deletion chain/chunks/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use near_primitives::{
types::EpochId,
};

#[derive(Message, Debug, strum::IntoStaticStr)]
#[derive(Message, Debug, strum::IntoStaticStr, PartialEq)]
#[rtype(result = "()")]
pub enum ShardsManagerRequestFromClient {
/// Processes the header seen from a block we received, if we have not already received the
Expand Down Expand Up @@ -50,4 +50,20 @@ pub enum ShardsManagerRequestFromClient {
/// proofs, but cannot be marked as complete because the previous block isn't available),
/// and completes them if so.
CheckIncompleteChunks(CryptoHash),
/// Process a `PartialEncodedChunk` obtained via the Chunk Distribution Network feature.
/// If the chunk turns out to be invalid then request from the p2p network instead.
ProcessOrRequestChunk {
candidate_chunk: PartialEncodedChunk,
request_header: ShardChunkHeader,
prev_hash: CryptoHash,
},
/// Similar to `ProcessOrRequestChunk` but for orphan chunks.
/// Process a `PartialEncodedChunk` obtained via the Chunk Distribution Network feature.
/// If the chunk turns out to be invalid then request from the p2p network instead.
ProcessOrRequestChunkForOrphan {
candidate_chunk: PartialEncodedChunk,
request_header: ShardChunkHeader,
epoch_id: EpochId,
ancestor_hash: CryptoHash,
},
}
21 changes: 21 additions & 0 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,27 @@ impl ShardsManager {
ShardsManagerRequestFromClient::CheckIncompleteChunks(prev_block_hash) => {
self.check_incomplete_chunks(&prev_block_hash)
}
ShardsManagerRequestFromClient::ProcessOrRequestChunk {
candidate_chunk,
request_header,
prev_hash,
} => {
if let Err(err) = self.process_partial_encoded_chunk(candidate_chunk.into()) {
warn!(target: "chunks", ?err, "Error processing partial encoded chunk");
self.request_chunk_single(&request_header, prev_hash, false);
}
}
ShardsManagerRequestFromClient::ProcessOrRequestChunkForOrphan {
candidate_chunk,
request_header,
ancestor_hash,
epoch_id,
} => {
if let Err(e) = self.process_partial_encoded_chunk(candidate_chunk.into()) {
warn!(target: "chunks", "Error processing partial encoded chunk: {:?}", e);
self.request_chunks_for_orphan(vec![request_header], &epoch_id, ancestor_hash);
}
}
}
}

Expand Down
Loading

0 comments on commit 5d65f81

Please sign in to comment.