Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bandwidth_scheduler) - include parent's receipts in bandwidth requests #12728

Merged
merged 8 commits into from
Jan 20, 2025
Merged
37 changes: 32 additions & 5 deletions runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,14 @@ impl ReceiptSink {
pub(crate) fn generate_bandwidth_requests(
&self,
trie: &dyn TrieAccess,
shard_layout: &ShardLayout,
side_effects: bool,
) -> Result<Option<BandwidthRequests>, StorageError> {
match self {
ReceiptSink::V1(_) => Ok(None),
ReceiptSink::V2(inner) => inner.generate_bandwidth_requests(trie, side_effects),
ReceiptSink::V2(inner) => {
inner.generate_bandwidth_requests(trie, shard_layout, side_effects)
}
}
}
}
Expand Down Expand Up @@ -504,6 +507,7 @@ impl ReceiptSinkV2 {
fn generate_bandwidth_requests(
&self,
trie: &dyn TrieAccess,
shard_layout: &ShardLayout,
side_effects: bool,
) -> Result<Option<BandwidthRequests>, StorageError> {
if !ProtocolFeature::BandwidthScheduler.enabled(self.protocol_version) {
Expand All @@ -518,9 +522,13 @@ impl ReceiptSinkV2 {

let mut requests = Vec::new();
for shard_id in self.outgoing_buffers.shards() {
if let Some(request) =
self.generate_bandwidth_request(shard_id, trie, side_effects, &params)?
{
if let Some(request) = self.generate_bandwidth_request(
shard_id,
trie,
shard_layout,
side_effects,
&params,
)? {
requests.push(request);
}
}
Expand All @@ -532,6 +540,7 @@ impl ReceiptSinkV2 {
&self,
to_shard: ShardId,
trie: &dyn TrieAccess,
shard_layout: &ShardLayout,
side_effects: bool,
params: &BandwidthSchedulerParams,
) -> Result<Option<BandwidthRequest>, StorageError> {
Expand Down Expand Up @@ -564,7 +573,25 @@ impl ReceiptSinkV2 {
}

// Metadata is fully initialized, make a proper bandwidth request using it.
let receipt_sizes_iter = metadata.iter_receipt_group_sizes(trie, side_effects);
let mut receipt_sizes_iter: Box<dyn Iterator<Item = Result<u64, StorageError>>> =
Box::new(metadata.iter_receipt_group_sizes(trie, side_effects));

// When making a bandwidth request to a child shard which has been split from a parent
// shard, we have to include the receipts stored in the outgoing buffer to the parent shard
// in the bandwidth request for sending receipts to the child shard. Forwarding receipts
// from the buffer to parent uses bandwidth granted for sending receipts to one of the
// children. Not including the parent receipts in the bandwidth request could lead to a
// situation where a receipt can't be sent because the grant for sending receipts to a child
// is too small to send out a receipt from a buffer aimed at a parent.
if let Ok(parent_shard_id) = shard_layout.get_parent_shard_id(to_shard) {
if let Some(parent_metadata) =
self.outgoing_metadatas.get_metadata_for_shard(&parent_shard_id)
{
let parent_receipt_sizes_iter =
parent_metadata.iter_receipt_group_sizes(trie, side_effects);
receipt_sizes_iter = Box::new(receipt_sizes_iter.chain(parent_receipt_sizes_iter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resharding tries to empty the parent buffers first - (double check please) - so I would feel better to do it in the same order here. It may be important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, might start mattering what exactly do we fill within the bandwidth request as it's based on the order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, I wanted to have the parent buffer first but I did the opposite by mistake

}
}

// There's a bug which allows to create receipts above `max_receipt_size` (https://github.com/near/nearcore/issues/12606).
// This could cause problems with bandwidth scheduler which would generate requests for size above max size, and these
Expand Down
5 changes: 3 additions & 2 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,7 @@ impl Runtime {
let pending_delayed_receipts = processing_state.delayed_receipts;
let processed_delayed_receipts = process_receipts_result.processed_delayed_receipts;
let promise_yield_result = process_receipts_result.promise_yield_result;
let shard_layout = epoch_info_provider.shard_layout(&apply_state.epoch_id)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might break replayability, but AFAIU we now support only the last two versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it break replayability?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to have this here as long as we are not doing something different for past protocol versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it break replayability?

Previously the call to epoch_info_provider.shard_layout() was gated by ProtocolFeature::SimpleNightshadeV4.enabled(protocol_version), I don't know what would happen on older protocol versions, would it throw an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I probably wouldn't be too worried 😅


if promise_yield_result.promise_yield_indices
!= promise_yield_result.initial_promise_yield_indices
Expand All @@ -2074,7 +2075,6 @@ impl Runtime {

let (all_shards, shard_seed) =
if ProtocolFeature::SimpleNightshadeV4.enabled(protocol_version) {
let shard_layout = epoch_info_provider.shard_layout(&apply_state.epoch_id)?;
let shard_ids = shard_layout.shard_ids().collect_vec();
let shard_index = shard_layout
.get_shard_index(apply_state.shard_id)
Expand All @@ -2095,7 +2095,8 @@ impl Runtime {
);
}

let bandwidth_requests = receipt_sink.generate_bandwidth_requests(&state_update, true)?;
let bandwidth_requests =
receipt_sink.generate_bandwidth_requests(&state_update, &shard_layout, true)?;

if cfg!(debug_assertions) {
if let Err(err) = check_balance(
Expand Down
Loading