Skip to content

Commit

Permalink
feat(banwidth_scheduler) - bandwidth scheduler (#12533)
Browse files Browse the repository at this point in the history
This PR adds implementation of the core bandwidth scheduler algorithm.
It takes the bandwidth requests generated by shards at the previous
height and decides how many bytes of receipts can be sent between each
pair of shards. The bandwidth grants generated by the bandwidth
scheduler are then used in `ReceiptSink` as outgoing size limits.
The exact algorithm is described in more detail in the module-level
comment.

This almost completes the basic bandwidth scheduler feature - shards
generate bandwidth requests and bandwidth scheduler decides how much
they are allowed to send. There is one more piece missing - the
algorithm for distributing remaining bandwidth after all requests have
been processed. I'll add it in another PR, this one is big enough
already. It's not needed for correctness, it just improves typical
throughput.

### Missing chunks

One thing I'm still not 100% sure about is the missing chunks. I'm
starting to think that the rule "don't send anything to a shard that had
a missing chunk" might be enough to make sure that a shard never has
more than `max_shard_bandwidth` incoming receipts. Maybe there was no
fatal flaw after all? I need to think more about it, or just write a
test for it.

### Performance

Complexity of the scheduler algorithm is `O(num_shards^2 *
log(num_shards))`
It's pretty much impossible to avoid quadratic complexity because we
have to consider every pair of shards. The log comes from sorting the
requests by allowance.

I measured the worst-case performance on a typical n2d-standard-8 GCP
VM, and I think it should work fine up to ~128 shards:
```
Running scheduler with 6 shards: 0.10 ms
Running scheduler with 10 shards: 0.16 ms
Running scheduler with 32 shards: 1.76 ms
Running scheduler with 64 shards: 5.74 ms
Running scheduler with 128 shards: 23.63 ms
Running scheduler with 256 shards: 93.15 ms
Running scheduler with 512 shards: 371.76 ms
```

Once we reach 100+ shards we might have to revisit the design. Maybe
choose only a random subset of shards/links that are allowed to send
receipts at each height? I think we can worry about it when we get
there.

---

The code is basically ready, but untested. I'll add the tests next week.

The PR is meant to be reviewed commit-by-commit. (note that last commit
fixes the initial implementation of the scheduler!)
  • Loading branch information
jancionear authored Dec 11, 2024
1 parent c2d7028 commit 8a4e813
Show file tree
Hide file tree
Showing 20 changed files with 2,588 additions and 279 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 23 additions & 13 deletions core/primitives/src/bandwidth_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bitvec::order::Lsb0;
use bitvec::slice::BitSlice;
use borsh::{BorshDeserialize, BorshSerialize};
use near_parameters::RuntimeConfig;
use near_primitives_core::hash::CryptoHash;
use near_primitives_core::types::{ProtocolVersion, ShardId};
use near_primitives_core::version::ProtocolFeature;
use near_schema_checker_lib::ProtocolSchema;
Expand Down Expand Up @@ -301,20 +302,29 @@ impl BlockBandwidthRequests {
/// The state should be the same on all shards. All shards start with the same state
/// and apply the same bandwidth scheduler algorithm at the same heights, so the resulting
/// scheduler state stays the same.
#[derive(
BorshSerialize,
BorshDeserialize,
serde::Serialize,
serde::Deserialize,
Debug,
Clone,
PartialEq,
Eq,
ProtocolSchema,
)]
/// TODO(bandwidth_scheduler) - make this struct versioned.
#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, PartialEq, Eq, ProtocolSchema)]
pub struct BandwidthSchedulerState {
/// Random data for now
pub mock_data: [u8; 32],
/// Allowance for every pair of (sender, receiver). Used in the scheduler algorithm.
/// Bandwidth scheduler updates the allowances on every run.
pub link_allowances: Vec<LinkAllowance>,
/// Sanity check hash to assert that all shards run bandwidth scheduler in the exact same way.
/// Hash of previous scheduler state and (some) scheduler inputs.
pub sanity_check_hash: CryptoHash,
}

/// Allowance for a (sender, receiver) pair of shards.
/// Used in bandwidth scheduler.
#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, PartialEq, Eq, ProtocolSchema)]
pub struct LinkAllowance {
/// Sender shard
pub sender: ShardId,
/// Receiver shard
pub receiver: ShardId,
/// Link allowance, determines priority for granting bandwidth.
/// See the bandwidth scheduler module-level comment for a more
/// detailed description.
pub allowance: Bandwidth,
}

/// Parameters used in the bandwidth scheduler algorithm.
Expand Down
11 changes: 8 additions & 3 deletions core/primitives/src/congestion_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ impl CongestionControl {
pub fn outgoing_gas_limit(&self, sender_shard: ShardId) -> Gas {
let congestion = self.congestion_level();

// note: using float equality is okay here because
// `clamped_f64_fraction` clamps to exactly 1.0.
if congestion == 1.0 {
if Self::is_fully_congested(congestion) {
// Red traffic light: reduce to minimum speed
if sender_shard == ShardId::from(self.info.allowed_shard()) {
self.config.allowed_shard_outgoing_gas
Expand All @@ -95,6 +93,13 @@ impl CongestionControl {
}
}

pub fn is_fully_congested(congestion_level: f64) -> bool {
// note: using float equality is okay here because
// `clamped_f64_fraction` clamps to exactly 1.0.
debug_assert!(congestion_level <= 1.0);
congestion_level == 1.0
}

/// How much data another shard can send to us in the next block.
pub fn outgoing_size_limit(&self, sender_shard: ShardId) -> Gas {
if sender_shard == ShardId::from(self.info.allowed_shard()) {
Expand Down
12 changes: 10 additions & 2 deletions core/primitives/src/shard_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,14 @@ impl ShardLayout {
self.shard_ids().map(|shard_id| ShardUId::from_shard_id_and_layout(shard_id, self))
}

pub fn shard_indexes(&self) -> impl Iterator<Item = ShardIndex> + 'static {
let num_shards: usize =
self.num_shards().try_into().expect("Number of shards doesn't fit in usize");
match self {
Self::V0(_) | Self::V1(_) | Self::V2(_) => (0..num_shards).into_iter(),
}
}

/// Returns an iterator that returns the ShardInfos for every shard in
/// this shard layout. This method should be preferred over calling
/// shard_ids().enumerate(). Today the result of shard_ids() is sorted but
Expand Down Expand Up @@ -778,8 +786,8 @@ impl ShardLayout {
Ok(ShardId::new(shard_index as u64))
}
Self::V2(v2) => v2
.index_to_id_map
.get(&shard_index)
.shard_ids
.get(shard_index)
.copied()
.ok_or(ShardLayoutError::InvalidShardIndexError { shard_index }),
}
Expand Down
Loading

0 comments on commit 8a4e813

Please sign in to comment.