Skip to content

Commit

Permalink
revert dropping reprocessing queue
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Dec 13, 2023
1 parent 9a2a351 commit b41ed07
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum Error {
SigSlotStartIsNone,
/// Failed to construct a LightClientOptimisticUpdate from state.
FailedConstructingUpdate,
/// Unknown block with parent root.
UnknownBlockParentRoot(Hash256),
}

/// Wraps a `LightClientOptimisticUpdate` that has been verified for propagation on the gossip network.
Expand Down Expand Up @@ -59,6 +61,19 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
return Err(Error::TooEarly);
}

let head = chain.canonical_head.cached_head();
let head_block = &head.snapshot.beacon_block;
// check if we can process the optimistic update immediately
// otherwise queue
let canonical_root = rcv_optimistic_update
.attested_header
.beacon
.canonical_root();

if canonical_root != head_block.message().parent_root() {
return Err(Error::UnknownBlockParentRoot(canonical_root));
}

let latest_optimistic_update = chain
.light_client_server_cache
.get_latest_optimistic_update()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use types::{

use beacon_processor::{
work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
ReprocessQueueMessage,
},
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage,
};
Expand Down Expand Up @@ -1692,6 +1693,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
seen_timestamp: Duration,
) {
match self.chain.verify_optimistic_update_for_gossip(
Expand All @@ -1710,6 +1712,56 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
Err(e) => {
match e {
LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES,
);
debug!(
self.log,
"Optimistic update for unknown block";
"peer_id" => %peer_id,
"parent_root" => ?parent_root
);

if let Some(sender) = reprocess_tx {
let processor = self.clone();
let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(
QueuedLightClientUpdate {
parent_root,
process_fn: Box::new(move || {
processor.process_gossip_optimistic_update(
message_id,
peer_id,
light_client_optimistic_update,
None, // Do not reprocess this message again.
seen_timestamp,
)
}),
},
);

if sender.try_send(msg).is_err() {
error!(
self.log,
"Failed to send optimistic update for re-processing";
)
}
} else {
debug!(
self.log,
"Not sending light client update because it had been reprocessed";
"peer_id" => %peer_id,
"parent_root" => ?parent_root
);

self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
return;
}
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
metrics::register_optimistic_update_error(&e);

Expand Down
2 changes: 2 additions & 0 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
let reprocess_tx = processor.reprocess_tx.clone();
processor.process_gossip_optimistic_update(
message_id,
peer_id,
light_client_optimistic_update,
Some(reprocess_tx),
seen_timestamp,
)
};
Expand Down

0 comments on commit b41ed07

Please sign in to comment.