Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute recent lightclient updates #4969

Merged
merged 13 commits into from
Jan 31, 2024
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::{metrics, BeaconChainTypes, BeaconStore};
use parking_lot::{Mutex, RwLock};
use slog::{debug, Logger};
use ssz_types::FixedVector;
use std::num::NonZeroUsize;
use types::light_client_update::{FinalizedRootProofLen, FINALIZED_ROOT_INDEX};
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate,
LightClientHeader, LightClientOptimisticUpdate, Slot, SyncAggregate,
Expand All @@ -12,7 +14,7 @@ use types::{
/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the
/// prev block cache are very small 32 * (6 + 1) = 224 bytes. 32 is an arbitrary number that
/// represents unlikely re-orgs, while keeping the cache very small.
const PREV_BLOCK_CACHE_SIZE: usize = 32;
const PREV_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32);

/// This cache computes light client messages ahead of time, required to satisfy p2p and API
/// requests. These messages include proofs on historical states, so on-demand computation is
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ pub enum ReprocessQueueMessage {
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
/// A block that was successfully processed. We use this to handle attestations and light client updates
/// A block that was successfully processed. We use this to handle attestations updates
/// for unknown blocks.
BlockImported {
block_root: Hash256,
parent_root: Hash256,
},
/// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client
/// updates for unknown parent blocks.
NewLightClientOptimisticUpdate { parent_root: Hash256 },
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate),
/// An aggregated attestation that references an unknown block.
Expand Down Expand Up @@ -688,6 +691,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
.awaiting_lc_updates_per_parent_root
Expand Down
35 changes: 20 additions & 15 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ where
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.work_reprocessing_tx.clone(),
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
Expand Down Expand Up @@ -894,21 +894,26 @@ where
"addr_broadcast",
);
}
}

// Spawn service to publish light_client updates at some interval into the slot
if let Some(light_client_server_rv) = self.light_client_server_rv {
let inner_chain = beacon_chain.clone();
let broadcast_context =
runtime_context.service_context("lcserv_bcast".to_string());
let log = broadcast_context.log().clone();
broadcast_context.executor.spawn(
async move {
compute_light_client_updates(&inner_chain, light_client_server_rv, &log)
.await
},
"lcserv_broadcast",
);
}
// Spawn service to publish light_client updates at some interval into the slot.
if let Some(light_client_server_rv) = self.light_client_server_rv {
let inner_chain = beacon_chain.clone();
let light_client_update_context =
runtime_context.service_context("lc_update".to_string());
let log = light_client_update_context.log().clone();
light_client_update_context.executor.spawn(
async move {
compute_light_client_updates(
&inner_chain,
light_client_server_rv,
beacon_processor_channels.work_reprocessing_tx,
&log,
)
.await
},
"lc_update",
);
}

start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone());
Expand Down
12 changes: 11 additions & 1 deletion beacon_node/client/src/compute_light_client_updates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use slog::{error, Logger};
use tokio::sync::mpsc::Sender;

// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
// updates it is okay to drop some events in case of overloading. In normal network conditions
Expand All @@ -12,6 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
reprocess_tx: Sender<ReprocessQueueMessage>,
log: &Logger,
) {
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
Expand All @@ -20,10 +23,17 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
// Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay
// since only the most recent updates have value.
while let Some(event) = light_client_server_rv.next().await {
let parent_root = event.0;

chain
.recompute_and_cache_light_client_updates(event)
.unwrap_or_else(|e| {
error!(log, "error computing light_client updates {:?}", e);
})
});

let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
if reprocess_tx.try_send(msg).is_err() {
error!(log, "Failed to inform light client update"; "parent_root" => %parent_root)
};
}
}
1 change: 1 addition & 0 deletions consensus/types/src/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ impl<T: EthSpec> BeaconState<T> {
};

// 2. Get all `BeaconState` leaves.
self.initialize_tree_hash_cache();
let mut cache = self
.tree_hash_cache_mut()
.take()
Expand Down
1 change: 1 addition & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,7 @@ fn light_client_server_default() {
.run_with_zero_port()
.with_config(|config| {
assert_eq!(config.network.enable_light_client_server, false);
assert_eq!(config.chain.enable_light_client_server, false);
assert_eq!(config.http_api.enable_light_client_server, false);
});
}
Expand Down
92 changes: 91 additions & 1 deletion testing/simulator/src/checks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::local_network::LocalNetwork;
use node_test_rig::eth2::types::{BlockId, StateId};
use node_test_rig::eth2::types::{BlockId, FinalityCheckpointsData, StateId};
use std::time::Duration;
use types::{Epoch, EthSpec, ExecPayload, ExecutionBlockHash, Hash256, Slot, Unsigned};

Expand Down Expand Up @@ -243,3 +243,93 @@ pub async fn verify_transition_block_finalized<E: EthSpec>(
))
}
}

pub(crate) async fn verify_light_client_updates<E: EthSpec>(
network: LocalNetwork<E>,
start_slot: Slot,
end_slot: Slot,
slot_duration: Duration,
) -> Result<(), String> {
slot_delay(start_slot, slot_duration).await;

// Tolerance of 2 slot allows for 1 single missed slot.
let light_client_update_slot_tolerance = Slot::new(2);
let remote_nodes = network.remote_nodes()?;
let client = remote_nodes.first().unwrap();
let mut have_seen_block = false;
let mut have_achieved_finality = false;

for slot in start_slot.as_u64()..=end_slot.as_u64() {
slot_delay(Slot::new(1), slot_duration).await;
let slot = Slot::new(slot);
let previous_slot = slot - 1;

let previous_slot_block = client
.get_beacon_blocks::<E>(BlockId::Slot(previous_slot))
.await
.map_err(|e| {
format!("Unable to get beacon block for previous slot {previous_slot:?}: {e:?}")
})?;
let previous_slot_has_block = previous_slot_block.is_some();

if !have_seen_block {
// Make sure we have seen the first block in Altair, to make sure we have sync aggregates available.
if previous_slot_has_block {
have_seen_block = true;
}
// Wait for another slot before we check the first update to avoid race condition.
continue;
}

// Make sure previous slot has a block, otherwise skip checking for the signature slot distance
if !previous_slot_has_block {
continue;
}

// Verify light client optimistic update. `signature_slot_distance` should be 1 in the ideal scenario.
let signature_slot = client
.get_beacon_light_client_optimistic_update::<E>()
.await
.map_err(|e| format!("Error while getting light client updates: {:?}", e))?
.ok_or(format!("Light client optimistic update not found {slot:?}"))?
.data
.signature_slot;
let signature_slot_distance = slot - signature_slot;
if signature_slot_distance > light_client_update_slot_tolerance {
return Err(format!("Existing optimistic update too old: signature slot {signature_slot}, current slot {slot:?}"));
}

// Verify light client finality update. `signature_slot_distance` should be 1 in the ideal scenario.
// NOTE: Currently finality updates are produced as long as the finalized block is known, even if the finalized header
// sync committee period does not match the signature slot committee period.
// TODO: This complies with the current spec, but we should check if this is a bug.
if !have_achieved_finality {
let FinalityCheckpointsData { finalized, .. } = client
.get_beacon_states_finality_checkpoints(StateId::Head)
.await
.map_err(|e| format!("Unable to get beacon state finality checkpoint: {e:?}"))?
.ok_or("Unable to get head state".to_string())?
.data;
if !finalized.root.is_zero() {
// Wait for another slot before we check the first finality update to avoid race condition.
have_achieved_finality = true;
}
continue;
}
let signature_slot = client
.get_beacon_light_client_finality_update::<E>()
.await
.map_err(|e| format!("Error while getting light client updates: {:?}", e))?
.ok_or(format!("Light client finality update not found {slot:?}"))?
.data
.signature_slot;
let signature_slot_distance = slot - signature_slot;
if signature_slot_distance > light_client_update_slot_tolerance {
return Err(format!(
"Existing finality update too old: signature slot {signature_slot}, current slot {slot:?}"
));
}
}

Ok(())
}
12 changes: 12 additions & 0 deletions testing/simulator/src/eth1_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
fork,
sync_aggregate,
transition,
light_client_update,
) = futures::join!(
// Check that the chain finalizes at the first given opportunity.
checks::verify_first_finalization(network.clone(), slot_duration),
Expand Down Expand Up @@ -272,6 +273,13 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
Epoch::new(TERMINAL_BLOCK / MinimalEthSpec::slots_per_epoch()),
slot_duration,
post_merge_sim
),
checks::verify_light_client_updates(
network.clone(),
// Sync aggregate available from slot 1 after Altair fork transition.
Epoch::new(ALTAIR_FORK_EPOCH).start_slot(MinimalEthSpec::slots_per_epoch()) + 1,
Epoch::new(END_EPOCH).start_slot(MinimalEthSpec::slots_per_epoch()),
slot_duration
)
);

Expand All @@ -282,6 +290,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
fork?;
sync_aggregate?;
transition?;
light_client_update?;

// The `final_future` either completes immediately or never completes, depending on the value
// of `continue_after_checks`.
Expand Down Expand Up @@ -380,6 +389,9 @@ async fn create_local_network<E: EthSpec>(
beacon_config.network.target_peers = node_count + proposer_nodes - 1;

beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None);
beacon_config.network.enable_light_client_server = true;
beacon_config.chain.enable_light_client_server = true;
beacon_config.http_api.enable_light_client_server = true;

if post_merge_sim {
let el_config = execution_layer::Config {
Expand Down
Loading