Skip to content

Commit

Permalink
Update light client optimistic update re-processing logic. (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen authored Jan 20, 2024
1 parent b41ed07 commit 31acd0c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
7 changes: 6 additions & 1 deletion beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ pub enum ReprocessQueueMessage {
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
/// A block that was successfully processed. We use this to handle attestations and light client updates
/// A block that was successfully processed. We use this to handle attestations updates
/// for unknown blocks.
BlockImported {
block_root: Hash256,
parent_root: Hash256,
},
/// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client
/// updates for unknown parent blocks.
NewLightClientOptimisticUpdate { parent_root: Hash256 },
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate),
/// An aggregated attestation that references an unknown block.
Expand Down Expand Up @@ -688,6 +691,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
.awaiting_lc_updates_per_parent_root
Expand Down
11 changes: 8 additions & 3 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ where
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.work_reprocessing_tx.clone(),
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
Expand Down Expand Up @@ -857,8 +857,13 @@ where
let log = broadcast_context.log().clone();
broadcast_context.executor.spawn(
async move {
compute_light_client_updates(&inner_chain, light_client_server_rv, &log)
.await
compute_light_client_updates(
&inner_chain,
light_client_server_rv,
beacon_processor_channels.work_reprocessing_tx,
&log,
)
.await
},
"lcserv_broadcast",
);
Expand Down
12 changes: 11 additions & 1 deletion beacon_node/client/src/compute_light_client_updates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use slog::{error, Logger};
use tokio::sync::mpsc::Sender;

// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
// updates it is okay to drop some events in case of overloading. In normal network conditions
Expand All @@ -12,6 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
reprocess_tx: Sender<ReprocessQueueMessage>,
log: &Logger,
) {
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
Expand All @@ -20,10 +23,17 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
// Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay
// since only the most recent updates have value.
while let Some(event) = light_client_server_rv.next().await {
let parent_root = event.0;

chain
.recompute_and_cache_light_client_updates(event)
.unwrap_or_else(|e| {
error!(log, "error computing light_client updates {:?}", e);
})
});

let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
if reprocess_tx.try_send(msg).is_err() {
error!(log, "Failed to inform light client update"; "parent_root" => %parent_root)
};
}
}

0 comments on commit 31acd0c

Please sign in to comment.