Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make flashblock ws url as a flag and add more data #442

Merged
merged 6 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ jobs:
- name: Run integration tests
run: cargo test --package op-rbuilder --lib --features optimism,integration -- integration::integration_test::tests

- name: Build flashblocks rbuilder
run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks

- name: Run flashblocks builder integration tests
run: cargo test --package op-rbuilder --lib --features optimism,integration,flashblocks -- integration::integration_test::tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add the features to the makefile?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually if you want the tests to pass you should remove flashblocks from the integration tests

Copy link
Contributor Author

@cody-wang-cb cody-wang-cb Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I should add another CI check for the flashblocks builder tests? (added above)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its guarded behind a feature flag you should need a separate command / CI test and everything should pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup the above is a separate CI test in addition to the original test


- name: Aggregate playground logs
# This steps fails if the test fails early and the playground logs dir has not been created
if: ${{ failure() }}
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lint: ## Run the linters
test: ## Run the tests for rbuilder and op-rbuilder
cargo test --verbose --features "$(FEATURES)"
cargo test -p op-rbuilder --verbose --features "$(FEATURES),optimism"
cargo test -p op-rbuilder --verbose --features "$(FEATURES),optimism,flashblocks"

.PHONY: lt
lt: lint test ## Run "lint" and "test"
Expand Down
7 changes: 7 additions & 0 deletions crates/op-rbuilder/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,11 @@ pub struct OpRbuilderArgs {
/// Builder secret key for signing last transaction in block
#[arg(long = "rollup.builder-secret-key", env = "BUILDER_SECRET_KEY")]
pub builder_signer: Option<Signer>,
/// Websocket port for flashblock payload builder
#[arg(
long = "rollup.flashblocks-ws-url",
env = "FLASHBLOCKS_WS_URL",
default_value = "127.0.0.1:1111"
)]
pub flashblocks_ws_url: String,
}
131 changes: 131 additions & 0 deletions crates/op-rbuilder/src/integration/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ mod tests {
use alloy_provider::Identity;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::BlockTransactionsKind;
use futures_util::StreamExt;
use op_alloy_consensus::OpTypedTransaction;
use op_alloy_network::Optimism;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio_tungstenite::connect_async;
use uuid::Uuid;

const BUILDER_PRIVATE_KEY: &str =
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d";

#[tokio::test]
#[cfg(not(feature = "flashblocks"))]
async fn integration_test_chain_produces_blocks() -> eyre::Result<()> {
// This is a simple test using the integration framework to test that the chain
// produces blocks.
Expand Down Expand Up @@ -93,6 +98,7 @@ mod tests {
}

#[tokio::test]
#[cfg(not(feature = "flashblocks"))]
async fn integration_test_revert_protection() -> eyre::Result<()> {
// This is a simple test using the integration framework to test that the chain
// produces blocks.
Expand Down Expand Up @@ -218,6 +224,7 @@ mod tests {
}

#[tokio::test]
#[cfg(not(feature = "flashblocks"))]
async fn integration_test_fee_priority_ordering() -> eyre::Result<()> {
// This test validates that transactions are ordered by fee priority in blocks
let mut framework =
Expand Down Expand Up @@ -340,4 +347,128 @@ mod tests {

Ok(())
}

#[tokio::test]
#[cfg(feature = "flashblocks")]
async fn integration_test_chain_produces_blocks() -> eyre::Result<()> {
// This is a simple test using the integration framework to test that the chain
// produces blocks.
let mut framework =
IntegrationFramework::new("integration_test_chain_produces_blocks").unwrap();

// we are going to use a genesis file pre-generated before the test
let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
genesis_path.push("../../genesis.json");
assert!(genesis_path.exists());

// create the builder
let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string());
let op_rbuilder_config = OpRbuilderConfig::new()
.chain_config_path(genesis_path.clone())
.data_dir(builder_data_dir)
.auth_rpc_port(1234)
.network_port(1235)
.http_port(1238)
.with_builder_private_key(BUILDER_PRIVATE_KEY)
.with_flashblocks_ws_url("localhost:1239");

// create the validation reth node
let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string());
let reth = OpRethConfig::new()
.chain_config_path(genesis_path)
.data_dir(reth_data_dir)
.auth_rpc_port(1236)
.network_port(1237);

framework.start("op-reth", &reth).await.unwrap();

let op_rbuilder = framework
.start("op-rbuilder", &op_rbuilder_config)
.await
.unwrap();

// Create a struct to hold received messages
let received_messages = Arc::new(Mutex::new(Vec::new()));
let messages_clone = received_messages.clone();

// Spawn WebSocket listener task
let ws_handle = tokio::spawn(async move {
let (ws_stream, _) = connect_async("ws://localhost:1239").await?;
let (_, mut read) = ws_stream.split();

while let Some(Ok(msg)) = read.next().await {
if let Ok(text) = msg.into_text() {
messages_clone.lock().unwrap().push(text);
}
}
Ok::<_, eyre::Error>(())
});

let engine_api = EngineApi::new("http://localhost:1234").unwrap();
let validation_api = EngineApi::new("http://localhost:1236").unwrap();

let mut generator = BlockGenerator::new(&engine_api, Some(&validation_api), false, 1, None);
generator.init().await?;

let provider = ProviderBuilder::<Identity, Identity, Optimism>::default()
.on_http("http://localhost:1238".parse()?);

for _ in 0..10 {
let block_hash = generator.generate_block().await?;

// query the block and the transactions inside the block
let block = provider
.get_block_by_hash(block_hash, BlockTransactionsKind::Hashes)
.await?
.expect("block");

for hash in block.transactions.hashes() {
let _ = provider
.get_transaction_receipt(hash)
.await?
.expect("receipt");
}
}
// there must be a line logging the monitoring transaction
op_rbuilder
.find_log_line("Processing new chain commit") // no builder tx for flashblocks builder
.await?;

// Process websocket messages
let timeout_duration = Duration::from_secs(10);
tokio::time::timeout(timeout_duration, async {
let mut message_count = 0;
loop {
if message_count >= 10 {
break;
}
let messages = received_messages.lock().unwrap();
let messages_json: Vec<serde_json::Value> = messages
.iter()
.map(|msg| serde_json::from_str(msg).unwrap())
.collect();
for msg in messages_json.iter() {
let metadata = msg.get("metadata");
assert!(metadata.is_some(), "metadata field missing");
let metadata = metadata.unwrap();
assert!(
metadata.get("block_number").is_some(),
"block_number missing"
);
assert!(
metadata.get("new_account_balances").is_some(),
"new_account_balances missing"
);
assert!(metadata.get("receipts").is_some(), "receipts missing");
message_count += 1;
}
drop(messages);
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await?;
ws_handle.abort();

Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/op-rbuilder/src/integration/op_rbuilder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct OpRbuilderConfig {
http_port: Option<u16>,
network_port: Option<u16>,
builder_private_key: Option<String>,
flashblocks_ws_url: Option<String>,
}

impl OpRbuilderConfig {
Expand Down Expand Up @@ -60,6 +61,11 @@ impl OpRbuilderConfig {
self.builder_private_key = Some(private_key.to_string());
self
}

pub fn with_flashblocks_ws_url(mut self, url: &str) -> Self {
self.flashblocks_ws_url = Some(url.to_string());
self
}
}

impl Service for OpRbuilderConfig {
Expand Down Expand Up @@ -107,6 +113,11 @@ impl Service for OpRbuilderConfig {
.arg(http_port.to_string());
}

if let Some(flashblocks_ws_url) = &self.flashblocks_ws_url {
cmd.arg("--rollup.flashblocks-ws-url")
.arg(flashblocks_ws_url);
}

cmd
}

Expand Down
9 changes: 4 additions & 5 deletions crates/op-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ fn main() {
let op_node = OpNode::new(rollup_args.clone());
let handle = builder
.with_types::<OpNode>()
.with_components(
op_node
.components()
.payload(CustomOpPayloadBuilder::new(builder_args.builder_signer)),
)
.with_components(op_node.components().payload(CustomOpPayloadBuilder::new(
builder_args.builder_signer,
builder_args.flashblocks_ws_url,
)))
.with_add_ons(
OpAddOnsBuilder::default()
.with_sequencer(rollup_args.sequencer_http.clone())
Expand Down
48 changes: 41 additions & 7 deletions crates/op-rbuilder/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::tx_signer::Signer;
use alloy_consensus::{Eip658Value, Header, Transaction, Typed2718, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::merge::BEACON_NONCE;
use alloy_eips::Encodable2718;
use alloy_primitives::{Address, Bytes, B256, U256};
use alloy_primitives::{map::HashMap, Address, Bytes, B256, U256};
use alloy_rpc_types_engine::PayloadId;
use alloy_rpc_types_eth::Withdrawals;
use op_alloy_consensus::OpDepositReceipt;
Expand Down Expand Up @@ -45,6 +45,7 @@ use reth_payload_util::PayloadTransactions;
use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, BlockBody, SealedHeader};
use reth_primitives_traits::proofs;
use reth_primitives_traits::Block as _;
use reth_primitives_traits::SignedTransaction;
use reth_provider::CanonStateSubscriptions;
use reth_provider::StorageRootProvider;
use reth_provider::{
Expand All @@ -62,7 +63,6 @@ use revm::{
use rollup_boost::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1,
};
use serde_json::Value;
use std::error::Error as StdError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
Expand All @@ -74,16 +74,29 @@ use tokio::sync::mpsc;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::WebSocketStream;

#[derive(Debug, Clone, Copy, Default)]
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct FlashblocksMetadata<N: NodePrimitives> {
receipts: HashMap<B256, N::Receipt>,
new_account_balances: HashMap<Address, U256>,
block_number: u64,
}

#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CustomOpPayloadBuilder {
#[allow(dead_code)]
builder_signer: Option<Signer>,
flashblocks_ws_url: String,
}

impl CustomOpPayloadBuilder {
pub fn new(builder_signer: Option<Signer>) -> Self {
Self { builder_signer }
pub fn new(builder_signer: Option<Signer>, flashblocks_ws_url: String) -> Self {
Self {
builder_signer,
flashblocks_ws_url,
}
}
}

Expand Down Expand Up @@ -112,6 +125,7 @@ where
pool,
ctx.provider().clone(),
Arc::new(BasicOpReceiptBuilder::default()),
self.flashblocks_ws_url.clone(),
))
}

Expand Down Expand Up @@ -194,14 +208,15 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives> OpPayloadBuilder<Pool, Client,
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
flashblocks_ws_url: String,
) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let subscribers = Arc::new(Mutex::new(Vec::new()));

Self::publish_task(rx, subscribers.clone());

tokio::spawn(async move {
Self::start_ws(subscribers, "127.0.0.1:1111").await;
Self::start_ws(subscribers, &flashblocks_ws_url).await;
});

Self {
Expand Down Expand Up @@ -548,10 +563,29 @@ where
info.last_flashblock_index = info.executed_transactions.len();

let new_transactions_encoded = new_transactions
.clone()
.into_iter()
.map(|tx| tx.encoded_2718().into())
.collect::<Vec<_>>();

let new_receipts = info.receipts[info.last_flashblock_index..].to_vec();
let receipts_with_hash = new_transactions
.iter()
.zip(new_receipts.iter())
.map(|(tx, receipt)| (*tx.tx_hash(), receipt.clone()))
.collect::<HashMap<B256, N::Receipt>>();
let new_account_balances = new_bundle
.state
.iter()
.filter_map(|(address, account)| account.info.as_ref().map(|info| (*address, info.balance)))
.collect::<HashMap<Address, U256>>();

let metadata: FlashblocksMetadata<N> = FlashblocksMetadata {
receipts: receipts_with_hash,
new_account_balances,
block_number: ctx.parent().number + 1,
};

// Prepare the flashblocks message
let fb_payload = FlashblocksPayloadV1 {
payload_id: ctx.payload_id(),
Expand Down Expand Up @@ -580,7 +614,7 @@ where
transactions: new_transactions_encoded,
withdrawals: ctx.withdrawals().cloned().unwrap_or_default().to_vec(),
},
metadata: Value::Null,
metadata: serde_json::to_value(&metadata).unwrap_or_default(),
};

Ok((
Expand Down
15 changes: 13 additions & 2 deletions crates/op-rbuilder/src/payload_builder_vanilla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,25 @@ use std::{fmt::Display, sync::Arc, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CustomOpPayloadBuilder {
builder_signer: Option<Signer>,
#[cfg(feature = "flashblocks")]
flashblocks_ws_url: String,
}

impl CustomOpPayloadBuilder {
pub fn new(builder_signer: Option<Signer>) -> Self {
#[cfg(feature = "flashblocks")]
pub fn new(builder_signer: Option<Signer>, flashblocks_ws_url: String) -> Self {
Self {
builder_signer,
flashblocks_ws_url,
}
}

#[cfg(not(feature = "flashblocks"))]
pub fn new(builder_signer: Option<Signer>, _flashblocks_ws_url: String) -> Self {
Self { builder_signer }
}
}
Expand Down
Loading