Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bandwidth_scheduler) - include parent's receipts in bandwidth requests #12728

Merged
merged 8 commits into from
Jan 20, 2025
Merged
23 changes: 0 additions & 23 deletions core/primitives/src/bandwidth_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,6 @@ impl BandwidthRequest {

Ok(Some(BandwidthRequest { to_shard: to_shard.into(), requested_values_bitmap: bitmap }))
}

/// Create a basic bandwidth request when receipt sizes are not available.
/// It'll request a single value - max_receipt_size. Bandwidth scheduler will
/// grant all the bandwidth to one of the shards that requests max_receipt_size.
/// The resulting behaviour will be similar to the previous approach with allowed shard.
/// It is used only during the protocol upgrade while the outgoing buffer metadata
/// is not built for receipts that were buffered before the upgrade.
pub fn make_max_receipt_size_request(
to_shard: ShardId,
params: &BandwidthSchedulerParams,
) -> BandwidthRequest {
let mut bitmap = BandwidthRequestBitmap::new();
let values = BandwidthRequestValues::new(params).values;

// Find the first value which allows to send a max size receipt
let max_receipt_size_value_pos = values
.iter()
.position(|&value| value >= params.max_receipt_size)
.expect("max_receipt_size is less than max_single_grant, a value should be found");
bitmap.set_bit(max_receipt_size_value_pos, true);

BandwidthRequest { to_shard: to_shard.into(), requested_values_bitmap: bitmap }
}
}

/// There are this many predefined values of bandwidth that can be requested in a BandwidthRequest.
Expand Down
45 changes: 43 additions & 2 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::test_loop::utils::receipts::{
use crate::test_loop::utils::resharding::fork_before_resharding_block;
use crate::test_loop::utils::resharding::{
call_burn_gas_contract, call_promise_yield, check_state_cleanup, execute_money_transfers,
execute_storage_operations, temporary_account_during_resharding, TrackedShardSchedule,
execute_storage_operations, send_large_cross_shard_receipts,
temporary_account_during_resharding, TrackedShardSchedule,
};
use crate::test_loop::utils::sharding::print_and_assert_shard_accounts;
use crate::test_loop::utils::transactions::{
Expand Down Expand Up @@ -468,7 +469,6 @@ fn test_resharding_v3_base(params: TestReshardingParameters) {
.loop_actions
.iter()
.for_each(|action| action.call(&env.datas, test_loop_data, client_account_id.clone()));

let clients =
client_handles.iter().map(|handle| &test_loop_data.get(handle).client).collect_vec();

Expand Down Expand Up @@ -1028,6 +1028,47 @@ fn test_resharding_v3_buffered_receipts_towards_splitted_shard_v2() {
test_resharding_v3_buffered_receipts_towards_splitted_shard_base(2);
}

/// This test sends large (3MB) receipts from a stable shard to shard that will be split into two.
/// These large receipts are buffered and at the resharding boundary the stable shard's outgoing
/// buffer contains receipts to the shard that was split. Bandwidth requests to the child where the
/// receipts will be sent must include the receipts stored in outgoing buffer to the parent shard,
/// otherwise there will be no bandwidth grants to send them.
Comment on lines +1031 to +1035
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to further my understanding, can you confirm the following?

The stable shard will contain a mix of receipts for both children. The parent buffer will be included in the request calculation for both child shard and so the requests will be overestimated. That will continue until the parent shard is emptied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct 👍

fn test_resharding_v3_large_receipts_towards_splitted_shard_base(base_shard_layout_version: u64) {
let account_in_left_child: AccountId = "account4".parse().unwrap();
let account_in_right_child: AccountId = "account6".parse().unwrap();
let account_in_stable_shard: AccountId = "account1".parse().unwrap();

let params = TestReshardingParametersBuilder::default()
.base_shard_layout_version(base_shard_layout_version)
.deploy_test_contract(account_in_left_child.clone())
.deploy_test_contract(account_in_right_child.clone())
.deploy_test_contract(account_in_stable_shard.clone())
.add_loop_action(send_large_cross_shard_receipts(
vec![account_in_stable_shard.clone()],
vec![account_in_left_child, account_in_right_child],
))
.add_loop_action(check_receipts_presence_at_resharding_block(
vec![account_in_stable_shard.clone()],
ReceiptKind::Buffered,
))
.add_loop_action(check_receipts_presence_after_resharding_block(
vec![account_in_stable_shard],
ReceiptKind::Buffered,
))
.build();
test_resharding_v3_base(params);
}

#[test]
fn slow_test_resharding_v3_large_receipts_towards_splitted_shard_v1() {
test_resharding_v3_large_receipts_towards_splitted_shard_base(1);
}

#[test]
fn slow_test_resharding_v3_large_receipts_towards_splitted_shard_v2() {
test_resharding_v3_large_receipts_towards_splitted_shard_base(2);
}

#[test]
#[cfg_attr(not(feature = "test_features"), ignore)]
fn test_resharding_v3_outgoing_receipts_towards_splitted_shard() {
Expand Down
149 changes: 145 additions & 4 deletions integration-tests/src/test_loop/utils/resharding.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::cell::Cell;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::num::NonZero;

use assert_matches::assert_matches;
use borsh::BorshDeserialize;
use bytesize::ByteSize;
use itertools::Itertools;
use near_async::test_loop::data::TestLoopData;
use near_chain::ChainStoreAccess;
Expand All @@ -12,6 +13,7 @@ use near_client::{Query, QueryError::GarbageCollectedBlock};
use near_crypto::Signer;
use near_primitives::action::{Action, FunctionCallAction};
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::ReceiptOrStateStoredReceipt;
use near_primitives::test_utils::create_user_test_signer;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockId, BlockReference, Gas, ShardId};
Expand All @@ -21,17 +23,19 @@ use near_primitives::views::{
use near_store::adapter::trie_store::get_shard_uid_mapping;
use near_store::adapter::StoreAdapter;
use near_store::db::refcount::decode_value_with_rc;
use near_store::trie::receipts_column_helper::{ShardsOutgoingReceiptBuffer, TrieQueue};
use near_store::{DBCol, ShardUId};
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;

use super::sharding::this_block_has_new_shard_layout;
use super::sharding::{next_epoch_has_new_shard_layout, this_block_has_new_shard_layout};
use crate::test_loop::env::TestData;
use crate::test_loop::utils::loop_action::LoopAction;
use crate::test_loop::utils::sharding::next_block_has_new_shard_layout;
use crate::test_loop::utils::sharding::{get_memtrie_for_shard, next_block_has_new_shard_layout};
use crate::test_loop::utils::transactions::{
check_txs, delete_account, get_anchor_hash, get_next_nonce, store_and_submit_tx, submit_tx,
check_txs, check_txs_remove_successful, delete_account, get_anchor_hash, get_next_nonce,
store_and_submit_tx, submit_tx,
};
use crate::test_loop::utils::{get_node_data, retrieve_client_actor, ONE_NEAR, TGAS};

Expand Down Expand Up @@ -330,6 +334,143 @@ pub(crate) fn call_burn_gas_contract(
LoopAction::new(action_fn, succeeded)
}

/// Send 3MB receipts from `signer_ids` shards to `receiver_ids` shards.
/// Receipts are sent just before the resharding boundary.
pub(crate) fn send_large_cross_shard_receipts(
signer_ids: Vec<AccountId>,
receiver_ids: Vec<AccountId>,
) -> LoopAction {
let resharding_height = Cell::new(None);
let nonce = Cell::new(102);
let txs = Cell::new(vec![]); // FIXME: Wouldn't RefCell be better?
let latest_height = Cell::new(0);
let (action_success_setter, succeeded) = LoopAction::shared_success_flag();

let action_fn = Box::new(
move |node_datas: &[TestData],
test_loop_data: &mut TestLoopData,
client_account_id: AccountId| {
let client_actor =
retrieve_client_actor(node_datas, test_loop_data, &client_account_id);
let tip = client_actor.client.chain.head().unwrap();
let epoch_manager = &client_actor.client.epoch_manager;

// Run this action only once at every block height.
if latest_height.get() == tip.height {
return;
}
latest_height.set(tip.height);

// Set resharding height once known
if resharding_height.get().is_none()
&& next_block_has_new_shard_layout(epoch_manager.as_ref(), &tip)
{
tracing::debug!(target: "test", height=tip.height, "resharding height set");
resharding_height.set(Some(tip.height));
}

for shard_uid in epoch_manager.get_shard_layout(&tip.epoch_id).unwrap().shard_uids() {
let mut outgoing_receipt_sizes: BTreeMap<ShardId, Vec<ByteSize>> = BTreeMap::new();

let memtrie =
get_memtrie_for_shard(&client_actor.client, &shard_uid, &tip.prev_block_hash);
Comment on lines +376 to +377
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why get the memtrie for the prev block? Or is this the API?

If the latter IMO those methods should include something like "from_prev_block" in the name. It's all too easy to make an off by one error as is. Not related to your PR, just ranting ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument is called prev_block_hash:

pub fn get_memtrie_for_shard(
    client: &Client,
    shard_uid: &ShardUId,
    prev_block_hash: &CryptoHash,
) -> Trie {
    let state_root =
        *client.chain.get_chunk_extra(prev_block_hash, shard_uid).unwrap().state_root();

    // Here memtries will be used as long as client has memtries enabled.
    let memtrie = client
        .runtime_adapter
        .get_trie_for_shard(shard_uid.shard_id(), prev_block_hash, state_root, false)
        .unwrap();
    assert!(memtrie.has_memtries());
    memtrie
}

AFAIU we get post-state root of the chunk included in the previous block, which means that it's the pre-state root of the chunk at the current height.
But tbh I just copied it from check_buffered_receipts_exist_in_memtrie, in this test being off by one doesn't really matter, what matters is that there are only receipts in the outgoing buffer to the parent shard.

let mut outgoing_buffers = ShardsOutgoingReceiptBuffer::load(&memtrie).unwrap();
for target_shard in outgoing_buffers.shards() {
let mut receipt_sizes = Vec::new();
for receipt in outgoing_buffers.to_shard(target_shard).iter(&memtrie, false) {
let receipt_size = match receipt {
Ok(ReceiptOrStateStoredReceipt::StateStoredReceipt(
state_stored_receipt,
)) => state_stored_receipt.metadata().congestion_size,
_ => panic!("receipt is {:?}", receipt),
};
Comment on lines +382 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mini nit: This could also be written as a let <pattern> = receipt else { panic!(..) }; }; Not sure if it's any better but it may be a bit shorter.

Copy link
Contributor Author

@jancionear jancionear Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like let else works well with short patterns, but for longer ones I find it harder to read than a standard match.

And in here it would even be longer:

let receipt_size = if let Ok(
    ReceiptOrStateStoredReceipt::StateStoredReceipt(state_stored_receipt),
) = receipt
{
    state_stored_receipt.metadata().congestion_size
} else {
    panic!("receipt is {:?}", receipt)
};

receipt_sizes.push(ByteSize::b(receipt_size));
}
if !receipt_sizes.is_empty() {
outgoing_receipt_sizes.insert(target_shard, receipt_sizes);
}
}
tracing::info!(target: "test", "outgoing buffers from shard {}: {:?}", shard_uid.shard_id(), outgoing_receipt_sizes);
}

let is_epoch_before_resharding =
next_epoch_has_new_shard_layout(epoch_manager.as_ref(), &tip);

// Estimate the resharding boundary to know when to start sending transactions.
let estimated_resharding_height = match resharding_height.get() {
Some(h) => h, // Resharding boundary known, use it.
None if is_epoch_before_resharding => {
// Resharding boundary unknown, estimate it.
let cur_epoch_start =
epoch_manager.get_epoch_start_height(&tip.last_block_hash).unwrap();
let cur_epoch_length =
epoch_manager.get_epoch_config(&tip.epoch_id).unwrap().epoch_length;
let cur_epoch_estimated_end = cur_epoch_start + cur_epoch_length;
cur_epoch_estimated_end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you following the convention of resharding_block being the last block of the old shard layout?

Copy link
Contributor Author

@jancionear jancionear Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm you're right, resharding_height is the last block of the old epoch, but cur_epoch_start + cur_epoch_length is actually the first block of the new one. I'll change it to match resharding_height.

}
_ => tip.height + 99999999999999, // Not in the next epoch, set to infinity into the future
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mini nit: maybe BlockHeight::MAX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially did that, but then I had to add something to estimated_resharding_height and that'd overflow, so I changed it to something smaller, but still very far into the future. I think there is no more adding now, but I'd prefer to keep it the way it is, less likely to cause trouble in the future.

};

// Send large cross-shard receipts a moment before the resharding happens.
if tip.height + 5 >= estimated_resharding_height
&& tip.height <= estimated_resharding_height - 3
{
for signer_id in &signer_ids {
for receiver_id in &receiver_ids {
// Send a 3MB cross-shard receipt from signer_id's shard to receiver_id's shard.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe move the contents to a helper method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to be a function with 8 arguments :/
call_burn_gas_contract also has the tx sending logic inlined, I think it's good enough.

let signer: Signer = create_user_test_signer(signer_id).into();
nonce.set(nonce.get() + 1);
let tx = SignedTransaction::call(
nonce.get(),
signer_id.clone(),
signer_id.clone(),
&signer,
1,
"generate_large_receipt".into(),
format!(
"{{\"account_id\": \"{}\", \"method_name\": \"noop\", \"total_args_size\": 3000000}}",
receiver_id
).into(),
300 * TGAS,
tip.last_block_hash,
);
tracing::info!(
target: "test",
"Sending 3MB receipt from {} to {}. tx_hash: {:?}",
signer_id,
receiver_id,
tx.get_hash()
);
store_and_submit_tx(
&node_datas,
&client_account_id,
&txs,
&signer_id,
&receiver_id,
tip.height,
tx,
);
}
}
}

// Check status of transactions, remove successful ones from the list.
check_txs_remove_successful(&txs, &client_actor.client);

// If the chain is past the resharding boundary and all transactions finished
// successfully, declare the action as successful.
if let Some(height) = resharding_height.get() {
let taken_txs = txs.take();
if tip.height > height + 2 && taken_txs.is_empty() {
action_success_setter.set(true);
}
txs.set(taken_txs);
}
},
);
LoopAction::new(action_fn, succeeded)
}

/// Sends a promise-yield transaction before resharding. Then, if `call_resume` is `true` also sends
/// a yield-resume transaction after resharding, otherwise it lets the promise-yield go into timeout.
///
Expand Down
9 changes: 5 additions & 4 deletions integration-tests/src/test_loop/utils/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ pub fn next_block_has_new_shard_layout(epoch_manager: &dyn EpochManagerAdapter,
return false;
}

let this_epoch_id = tip.epoch_id;
let next_epoch_id = epoch_manager.get_next_epoch_id(&tip.last_block_hash).unwrap();
next_epoch_has_new_shard_layout(epoch_manager, tip)
}

let this_shard_layout = epoch_manager.get_shard_layout(&this_epoch_id).unwrap();
let next_shard_layout = epoch_manager.get_shard_layout(&next_epoch_id).unwrap();
pub fn next_epoch_has_new_shard_layout(epoch_manager: &dyn EpochManagerAdapter, tip: &Tip) -> bool {
let this_shard_layout = epoch_manager.get_shard_layout(&tip.epoch_id).unwrap();
let next_shard_layout = epoch_manager.get_shard_layout(&tip.next_epoch_id).unwrap();

this_shard_layout != next_shard_layout
}
Expand Down
23 changes: 23 additions & 0 deletions integration-tests/src/test_loop/utils/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use near_async::test_loop::futures::TestLoopFutureSpawner;
use near_async::test_loop::sender::TestLoopSender;
use near_async::test_loop::TestLoopV2;
use near_async::time::Duration;
use near_chain::Error;
use near_client::client_actor::ClientActorInner;
use near_client::test_utils::test_loop::ClientQueries;
use near_client::{Client, ProcessTxResponse};
Expand Down Expand Up @@ -670,3 +671,25 @@ pub fn store_and_submit_tx(
txs.set(txs_vec);
submit_tx(&node_datas, &rpc_id, tx);
}

/// Checks status of the provided transactions. Panics if transaction result is an error.
/// Removes transactions that finished successfully from the list.
pub fn check_txs_remove_successful(txs: &Cell<Vec<(CryptoHash, u64)>>, client: &Client) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: u64->BlockHeight

let mut unfinished_txs = Vec::new();
for (tx_hash, tx_height) in txs.take() {
let tx_outcome = client.chain.get_final_transaction_result(&tx_hash);
let status = tx_outcome.as_ref().map(|o| o.status.clone());
tracing::debug!(target: "test", ?tx_height, ?tx_hash, ?status, "transaction status");
match status {
Ok(FinalExecutionStatus::SuccessValue(_)) => continue, // Transaction finished successfully, remove it.
Ok(FinalExecutionStatus::NotStarted)
| Ok(FinalExecutionStatus::Started)
| Err(Error::DBNotFoundErr(_)) => unfinished_txs.push((tx_hash, tx_height)), // Transaction in progress
_ => panic!( // Transaction error
"remove_successful_txs: Transaction failed! tx_hash = {:?}, tx_height = {}, status = {:?}",
tx_hash, tx_height, status
),
};
}
txs.set(unfinished_txs);
}
Loading
Loading