Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce size of futures in HTTP API to prevent stack overflows #5104

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()?;
let gossip_verified_block = GossipVerifiedBlock::new(Arc::new(block), chain)?;
let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?;

Ok((gossip_verified_block, gossip_verified_blobs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ mod test {
};

let availability_pending_block = AvailabilityPendingExecutedBlock {
block: Arc::new(block),
block,
import_data,
payload_verification_outcome,
};
Expand Down
26 changes: 13 additions & 13 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ where
slot: Slot,
) -> (SignedBlindedBeaconBlock<E>, BeaconState<E>) {
let (unblinded, new_state) = self.make_block(state, slot).await;
(unblinded.0.into(), new_state)
((*unblinded.0).clone().into(), new_state)
}

/// Returns a newly created block, signed by the proposer for the given slot.
Expand Down Expand Up @@ -866,14 +866,14 @@ where
panic!("Should always be a full payload response");
};

let signed_block = block_response.block.sign(
let signed_block = Arc::new(block_response.block.sign(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this test only code? If so do we also have stack overflows during tests?
(Question applies to all the uses in this file)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is just for tests, I ended up having to make quite a few changes to make the types consistent. There's a bit of inefficient Arc-ing and de-Arc-ing, but it's just in tests so it shouldn't be too bad

Jimmy got some stack overflows in tests, but only in debug mode

&self.validator_keypairs[proposer_index].sk,
&block_response.state.fork(),
block_response.state.genesis_validators_root(),
&self.spec,
);
));

let block_contents: SignedBlockContentsTuple<E> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E> = match *signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
Expand Down Expand Up @@ -928,14 +928,14 @@ where
panic!("Should always be a full payload response");
};

let signed_block = block_response.block.sign(
let signed_block = Arc::new(block_response.block.sign(
&self.validator_keypairs[proposer_index].sk,
&block_response.state.fork(),
block_response.state.genesis_validators_root(),
&self.spec,
);
));

let block_contents: SignedBlockContentsTuple<E> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E> = match *signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
Expand Down Expand Up @@ -1742,7 +1742,7 @@ where

let ((block, blobs), state) = self.make_block_return_pre_state(state, slot).await;

let (mut block, _) = block.deconstruct();
let (mut block, _) = (*block).clone().deconstruct();

block_modifier(&mut block);

Expand All @@ -1754,7 +1754,7 @@ where
state.genesis_validators_root(),
&self.spec,
);
((signed_block, blobs), state)
((Arc::new(signed_block), blobs), state)
}

pub async fn make_blob_with_modifier(
Expand All @@ -1768,7 +1768,7 @@ where

let ((block, mut blobs), state) = self.make_block_return_pre_state(state, slot).await;

let (block, _) = block.deconstruct();
let (block, _) = (*block).clone().deconstruct();

blob_modifier(&mut blobs.as_mut().unwrap().1);

Expand All @@ -1780,7 +1780,7 @@ where
state.genesis_validators_root(),
&self.spec,
);
((signed_block, blobs), state)
((Arc::new(signed_block), blobs), state)
}

pub fn make_deposits<'a>(
Expand Down Expand Up @@ -1873,7 +1873,7 @@ where
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), Arc::new(block), sidecars).unwrap(),
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand All @@ -1899,7 +1899,7 @@ where
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), Arc::new(block), sidecars).unwrap(),
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down
20 changes: 3 additions & 17 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,11 +1142,7 @@ async fn verify_block_for_gossip_slashing_detection() {
let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await;
let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await;

let verified_block = harness
.chain
.verify_block_for_gossip(Arc::new(block1))
.await
.unwrap();
let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap();

if let Some((kzg_proofs, blobs)) = blobs1 {
let sidecars =
Expand Down Expand Up @@ -1174,12 +1170,7 @@ async fn verify_block_for_gossip_slashing_detection() {
)
.await
.unwrap();
unwrap_err(
harness
.chain
.verify_block_for_gossip(Arc::new(block2))
.await,
);
unwrap_err(harness.chain.verify_block_for_gossip(block2).await);

// Slasher should have been handed the two conflicting blocks and crafted a slashing.
slasher.process_queued(Epoch::new(0)).unwrap();
Expand All @@ -1198,11 +1189,7 @@ async fn verify_block_for_gossip_doppelganger_detection() {
let state = harness.get_current_state();
let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await;

let verified_block = harness
.chain
.verify_block_for_gossip(Arc::new(block))
.await
.unwrap();
let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap();
let attestations = verified_block.block.message().body().attestations().clone();
harness
.chain
Expand Down Expand Up @@ -1564,7 +1551,6 @@ async fn import_duplicate_block_unrealized_justification() {
let slot = harness.get_current_slot();
let (block_contents, _) = harness.make_block(state.clone(), slot).await;
let (block, _) = block_contents;
let block = Arc::new(block);
let block_root = block.canonical_root();

// Create two verified variants of the block, representing the same block being processed in
Expand Down
11 changes: 5 additions & 6 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl InvalidPayloadRig {
.get_full_block(&block_root)
.unwrap()
.unwrap(),
block,
*block,
"block from db must match block imported"
);
}
Expand Down Expand Up @@ -700,7 +700,7 @@ async fn invalidates_all_descendants() {
.chain
.process_block(
fork_block.canonical_root(),
Arc::new(fork_block),
fork_block,
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down Expand Up @@ -800,7 +800,7 @@ async fn switches_heads() {
.chain
.process_block(
fork_block.canonical_root(),
Arc::new(fork_block),
fork_block,
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down Expand Up @@ -1044,8 +1044,7 @@ async fn invalid_parent() {
// Produce another block atop the parent, but don't import yet.
let slot = parent_block.slot() + 1;
rig.harness.set_current_slot(slot);
let (block_tuple, state) = rig.harness.make_block(parent_state, slot).await;
let block = Arc::new(block_tuple.0);
let ((block, _), state) = rig.harness.make_block(parent_state, slot).await;
let block_root = block.canonical_root();
assert_eq!(block.parent_root(), parent_root);

Expand Down Expand Up @@ -1865,7 +1864,7 @@ impl InvalidHeadSetup {
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.unwrap();
let (fork_block_tuple, _) = rig.harness.make_block(parent_state, slot).await;
opt_fork_block = Some(Arc::new(fork_block_tuple.0));
opt_fork_block = Some(fork_block_tuple.0);
} else {
// Skipped slot.
};
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2268,17 +2268,17 @@ async fn garbage_collect_temp_states_from_failed_block() {
let block_slot = Slot::new(2 * slots_per_epoch);
let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await;

let (mut block, _) = signed_block.deconstruct();
let (mut block, _) = (*signed_block).clone().deconstruct();

// Mutate the block to make it invalid, and re-sign it.
*block.state_root_mut() = Hash256::repeat_byte(0xff);
let proposer_index = block.proposer_index() as usize;
let block = block.sign(
let block = Arc::new(block.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&harness.spec,
);
));

// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
Expand Down Expand Up @@ -2677,7 +2677,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
.chain
.process_block(
invalid_fork_block.canonical_root(),
Arc::new(invalid_fork_block.clone()),
invalid_fork_block.clone(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand All @@ -2690,7 +2690,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
.chain
.process_block(
valid_fork_block.canonical_root(),
Arc::new(valid_fork_block.clone()),
valid_fork_block.clone(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |block_contents: SignedBlindedBeaconBlock<T::EthSpec>,
move |block_contents: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand Down Expand Up @@ -1450,6 +1450,7 @@ pub fn serve<T: BeaconChainTypes>(
&block_bytes,
&chain.spec,
)
.map(Arc::new)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
Expand Down Expand Up @@ -1478,7 +1479,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand Down Expand Up @@ -1519,6 +1520,7 @@ pub fn serve<T: BeaconChainTypes>(
&block_bytes,
&chain.spec,
)
.map(Arc::new)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
Expand Down
25 changes: 14 additions & 11 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten

if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = chain.process_gossip_blob(blob).await {
if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
Expand All @@ -210,14 +210,13 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
}

match chain
.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
publish_fn,
)
.await
match Box::pin(chain.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
publish_fn,
))
.await
{
Ok(AvailabilityProcessingStatus::Imported(root)) => {
info!(
Expand Down Expand Up @@ -291,7 +290,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
/// Handles a request from the HTTP API for blinded blocks. This converts blinded blocks into full
/// blocks before publishing.
pub async fn publish_blinded_block<T: BeaconChainTypes>(
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger,
Expand Down Expand Up @@ -319,7 +318,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
pub async fn reconstruct_block<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: SignedBlindedBeaconBlock<T::EthSpec>,
block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
log: Logger,
) -> Result<ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>>, Rejection> {
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
Expand Down Expand Up @@ -380,6 +379,10 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
None
};

// Perf: cloning the block here to unblind it is a little sub-optimal. This is considered an
// acceptable tradeoff to avoid passing blocks around on the stack (unarced), which blows up
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// acceptable tradeoff to avoid passing blocks around on the stack (unarced), which blows up
// acceptable tradeoff to avoid passing blocks around on the stack (unArc'ed), which blows up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda like the lowercase spelling

// the size of futures.
let block = (*block).clone();
match full_payload_opt {
// A block without a payload is pre-merge and we consider it locally
// built.
Expand Down
Loading
Loading