Skip to content

Commit

Permalink
Feat(near-client): Chunk distribution via message bus (near#10480)
Browse files Browse the repository at this point in the history
This PR implements the changes to nearcore proposed in
near#10083

To summarize briefly here, the goal of this project is to reduce the
latency experienced by RPC nodes through directly distributing chunks
over a message bus. Validators eagerly push the chunks they produce on
to this bus in addition to the peer-to-peer messages they send. When any
node (validator or RPC) realizes it needs a chunk (e.g. because it is
present in a new block) then it can check the message bus to see if it
present there before trying a request over the peer-to-peer network.

Participation in chunk distribution via the message bus is entirely
optional and disabled by default. This PR has no impact on nodes that
are not participating in the new chunk distribution. To be clear: this
PR is not a replacement for the existing chunk distribution logic via
the peer-to-peer network; it is a secondary channel which should provide
faster chunk distribution (on the happy path).

If invalid chunks are published to the message bus then nodes which
receive them will request via the peer-to-peer network instead.

The details of how the chunks are posted to the message bus and what
service exists for querying from the message bus are left intentionally
abstract. These will be handled by separate a application (or
applications). The nearcore code only sends HTTP requests to endpoints
specified in config that is used by node operators to opt-in to this
feature.
  • Loading branch information
birchmd authored Feb 20, 2024
1 parent aa8d89a commit 911bd26
Show file tree
Hide file tree
Showing 16 changed files with 733 additions and 22 deletions.
18 changes: 17 additions & 1 deletion chain/chunks/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use near_primitives::{
types::EpochId,
};

#[derive(Message, Debug, strum::IntoStaticStr)]
#[derive(Message, Debug, strum::IntoStaticStr, PartialEq)]
#[rtype(result = "()")]
pub enum ShardsManagerRequestFromClient {
/// Processes the header seen from a block we received, if we have not already received the
Expand Down Expand Up @@ -50,4 +50,20 @@ pub enum ShardsManagerRequestFromClient {
/// proofs, but cannot be marked as complete because the previous block isn't available),
/// and completes them if so.
CheckIncompleteChunks(CryptoHash),
/// Process a `PartialEncodedChunk` obtained via the Chunk Distribution Network feature.
/// If the chunk turns out to be invalid then request from the p2p network instead.
ProcessOrRequestChunk {
candidate_chunk: PartialEncodedChunk,
request_header: ShardChunkHeader,
prev_hash: CryptoHash,
},
/// Similar to `ProcessOrRequestChunk` but for orphan chunks.
/// Process a `PartialEncodedChunk` obtained via the Chunk Distribution Network feature.
/// If the chunk turns out to be invalid then request from the p2p network instead.
ProcessOrRequestChunkForOrphan {
candidate_chunk: PartialEncodedChunk,
request_header: ShardChunkHeader,
epoch_id: EpochId,
ancestor_hash: CryptoHash,
},
}
21 changes: 21 additions & 0 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2062,6 +2062,27 @@ impl ShardsManager {
ShardsManagerRequestFromClient::CheckIncompleteChunks(prev_block_hash) => {
self.check_incomplete_chunks(&prev_block_hash)
}
ShardsManagerRequestFromClient::ProcessOrRequestChunk {
candidate_chunk,
request_header,
prev_hash,
} => {
if let Err(err) = self.process_partial_encoded_chunk(candidate_chunk.into()) {
warn!(target: "chunks", ?err, "Error processing partial encoded chunk");
self.request_chunk_single(&request_header, prev_hash, false);
}
}
ShardsManagerRequestFromClient::ProcessOrRequestChunkForOrphan {
candidate_chunk,
request_header,
ancestor_hash,
epoch_id,
} => {
if let Err(e) = self.process_partial_encoded_chunk(candidate_chunk.into()) {
warn!(target: "chunks", "Error processing partial encoded chunk: {:?}", e);
self.request_chunks_for_orphan(vec![request_header], &epoch_id, ancestor_hash);
}
}
}
}

Expand Down
Loading

0 comments on commit 911bd26

Please sign in to comment.