From d63eacab80c9ae02e74589bf249fc6dcd19848f9 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 5 Mar 2025 17:26:57 -0500 Subject: [PATCH 1/5] custom url + additional metadata --- crates/op-rbuilder/src/args.rs | 7 ++++ crates/op-rbuilder/src/main.rs | 9 +++-- crates/op-rbuilder/src/payload_builder.rs | 40 +++++++++++++++++++---- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/crates/op-rbuilder/src/args.rs b/crates/op-rbuilder/src/args.rs index dd465c1cd..ee2b6e2aa 100644 --- a/crates/op-rbuilder/src/args.rs +++ b/crates/op-rbuilder/src/args.rs @@ -17,4 +17,11 @@ pub struct OpRbuilderArgs { /// Builder secret key for signing last transaction in block #[arg(long = "rollup.builder-secret-key", env = "BUILDER_SECRET_KEY")] pub builder_signer: Option, + /// Websocket port for flashblock payload builder + #[arg( + long = "rollup.flashblocks-ws-url", + env = "FLASHBLOCKS_WS_URL", + default_value = "127.0.0.1:1111" + )] + pub flashblocks_ws_url: String, } diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index 770c36102..fb9b3401c 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -33,11 +33,10 @@ fn main() { let op_node = OpNode::new(rollup_args.clone()); let handle = builder .with_types::() - .with_components( - op_node - .components() - .payload(CustomOpPayloadBuilder::new(builder_args.builder_signer)), - ) + .with_components(op_node.components().payload(CustomOpPayloadBuilder::new( + builder_args.builder_signer, + builder_args.flashblocks_ws_url, + ))) .with_add_ons( OpAddOnsBuilder::default() .with_sequencer(rollup_args.sequencer_http.clone()) diff --git a/crates/op-rbuilder/src/payload_builder.rs b/crates/op-rbuilder/src/payload_builder.rs index 9fba8e215..2b3719ffc 100644 --- a/crates/op-rbuilder/src/payload_builder.rs +++ b/crates/op-rbuilder/src/payload_builder.rs @@ -6,7 +6,7 @@ use crate::tx_signer::Signer; use alloy_consensus::{Eip658Value, Header, Transaction, Typed2718, EMPTY_OMMER_ROOT_HASH}; use alloy_eips::merge::BEACON_NONCE; use alloy_eips::Encodable2718; -use alloy_primitives::{Address, Bytes, B256, U256}; +use alloy_primitives::{map::HashMap, Address, Bytes, B256, U256}; use alloy_rpc_types_engine::PayloadId; use alloy_rpc_types_eth::Withdrawals; use op_alloy_consensus::OpDepositReceipt; @@ -45,6 +45,7 @@ use reth_payload_util::PayloadTransactions; use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, BlockBody, SealedHeader}; use reth_primitives_traits::proofs; use reth_primitives_traits::Block as _; +use reth_primitives_traits::SignedTransaction; use reth_provider::CanonStateSubscriptions; use reth_provider::StorageRootProvider; use reth_provider::{ @@ -62,7 +63,6 @@ use revm::{ use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, }; -use serde_json::Value; use std::error::Error as StdError; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -74,16 +74,20 @@ use tokio::sync::mpsc; use tokio_tungstenite::accept_async; use tokio_tungstenite::WebSocketStream; -#[derive(Debug, Clone, Copy, Default)] +#[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct CustomOpPayloadBuilder { #[allow(dead_code)] builder_signer: Option, + flashblocks_ws_url: String, } impl CustomOpPayloadBuilder { - pub fn new(builder_signer: Option) -> Self { - Self { builder_signer } + pub fn new(builder_signer: Option, flashblocks_ws_url: String) -> Self { + Self { + builder_signer, + flashblocks_ws_url, + } } } @@ -112,6 +116,7 @@ where pool, ctx.provider().clone(), Arc::new(BasicOpReceiptBuilder::default()), + self.flashblocks_ws_url.clone(), )) } @@ -194,6 +199,7 @@ impl OpPayloadBuilder>, + flashblocks_ws_url: String, ) -> Self { let (tx, rx) = mpsc::unbounded_channel(); let subscribers = Arc::new(Mutex::new(Vec::new())); @@ -201,7 +207,7 @@ impl OpPayloadBuilder>(); + let new_receipts = info.receipts[info.last_flashblock_index..].to_vec(); + + let receipts_with_hash = new_transactions + .iter() + .zip(new_receipts.iter()) + .map(|(tx, receipt)| (*tx.tx_hash(), receipt.clone())) + .collect::>(); + let new_account_balances = new_bundle + .state + .iter() + .filter_map(|(address, account)| account.info.as_ref().map(|info| (*address, info.balance))) + .collect::>(); + + let metadata = serde_json::json!({ + "receipts": receipts_with_hash, + "new_account_balances": new_account_balances, + "block_number": ctx.parent().number + 1, + }); + // Prepare the flashblocks message let fb_payload = FlashblocksPayloadV1 { payload_id: ctx.payload_id(), @@ -580,7 +606,7 @@ where transactions: new_transactions_encoded, withdrawals: ctx.withdrawals().cloned().unwrap_or_default().to_vec(), }, - metadata: Value::Null, + metadata, }; Ok(( From efc6f475bdd6626fa1984a1a9e6254df8ca26a2b Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 5 Mar 2025 19:06:00 -0500 Subject: [PATCH 2/5] add test --- .github/workflows/checks.yaml | 4 +- .../src/integration/integration_test.rs | 60 ++++++++++++++++++- crates/op-rbuilder/src/integration/mod.rs | 2 + .../src/integration/op_rbuilder.rs | 11 ++++ .../src/payload_builder_vanilla.rs | 10 +++- 5 files changed, 80 insertions(+), 7 deletions(-) diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml index b6dff023a..2804184b3 100644 --- a/.github/workflows/checks.yaml +++ b/.github/workflows/checks.yaml @@ -171,13 +171,13 @@ jobs: echo "$(pwd)" >> $GITHUB_PATH - name: Build the rbuilder - run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism + run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks - name: Generate test genesis file run: cargo run -p op-rbuilder --bin tester --features optimism -- genesis --output genesis.json - name: Run integration tests - run: cargo test --package op-rbuilder --lib --features optimism,integration -- integration::integration_test::tests + run: cargo test --package op-rbuilder --lib --features optimism,integration,flashblocks -- integration::integration_test::tests - name: Aggregate playground logs # This steps fails if the test fails early and the playground logs dir has not been created diff --git a/crates/op-rbuilder/src/integration/integration_test.rs b/crates/op-rbuilder/src/integration/integration_test.rs index 5cdc49980..1599fbf87 100644 --- a/crates/op-rbuilder/src/integration/integration_test.rs +++ b/crates/op-rbuilder/src/integration/integration_test.rs @@ -12,10 +12,14 @@ mod tests { use alloy_provider::Identity; use alloy_provider::{Provider, ProviderBuilder}; use alloy_rpc_types_eth::BlockTransactionsKind; + use futures_util::StreamExt; use op_alloy_consensus::OpTypedTransaction; use op_alloy_network::Optimism; use std::cmp::max; use std::path::PathBuf; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use tokio_tungstenite::connect_async; use uuid::Uuid; const BUILDER_PRIVATE_KEY: &str = @@ -41,7 +45,8 @@ mod tests { .auth_rpc_port(1234) .network_port(1235) .http_port(1238) - .with_builder_private_key(BUILDER_PRIVATE_KEY); + .with_builder_private_key(BUILDER_PRIVATE_KEY) + .with_flashblocks_ws_url("localhost:1239"); // create the validation reth node let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); @@ -58,6 +63,23 @@ mod tests { .await .unwrap(); + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + + // Spawn WebSocket listener task + let ws_handle = tokio::spawn(async move { + let (ws_stream, _) = connect_async("ws://localhost:1239").await?; + let (_, mut read) = ws_stream.split(); + + while let Some(Ok(msg)) = read.next().await { + if let Ok(text) = msg.into_text() { + messages_clone.lock().unwrap().push(text); + } + } + Ok::<_, eyre::Error>(()) + }); + let engine_api = EngineApi::new("http://localhost:1234").unwrap(); let validation_api = EngineApi::new("http://localhost:1236").unwrap(); @@ -83,12 +105,46 @@ mod tests { .expect("receipt"); } } - // there must be a line logging the monitoring transaction op_rbuilder .find_log_line("Committed block built by builder") .await?; + // Wait for specific messages or timeout + let timeout_duration = Duration::from_secs(10); + tokio::time::timeout(timeout_duration, async { + let mut message_count = 0; + loop { + if message_count >= 10 { + break; + } + let messages = received_messages.lock().unwrap(); + let messages_json: Vec = messages + .iter() + .map(|msg| serde_json::from_str(msg).unwrap()) + .collect(); + for msg in messages_json.iter() { + let metadata = msg.get("metadata"); + assert!(metadata.is_some(), "metadata field missing"); + let metadata = metadata.unwrap(); + assert!( + metadata.get("block_number").is_some(), + "block_number missing" + ); + assert!( + metadata.get("new_account_balances").is_some(), + "new_account_balances missing" + ); + assert!(metadata.get("receipts").is_some(), "receipts missing"); + message_count += 1; + } + drop(messages); + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await?; + ws_handle.abort(); + Ok(()) } diff --git a/crates/op-rbuilder/src/integration/mod.rs b/crates/op-rbuilder/src/integration/mod.rs index 26a86d354..f40ee03e8 100644 --- a/crates/op-rbuilder/src/integration/mod.rs +++ b/crates/op-rbuilder/src/integration/mod.rs @@ -130,6 +130,8 @@ impl ServiceInstance { file.read_to_string(&mut contents) .map_err(|_| eyre::eyre!("Failed to read log file"))?; + println!("contents: {:?}", contents); + if contents.contains(pattern) { Ok(()) } else { diff --git a/crates/op-rbuilder/src/integration/op_rbuilder.rs b/crates/op-rbuilder/src/integration/op_rbuilder.rs index 319f22547..e201365df 100644 --- a/crates/op-rbuilder/src/integration/op_rbuilder.rs +++ b/crates/op-rbuilder/src/integration/op_rbuilder.rs @@ -24,6 +24,7 @@ pub struct OpRbuilderConfig { http_port: Option, network_port: Option, builder_private_key: Option, + flashblocks_ws_url: Option, } impl OpRbuilderConfig { @@ -60,6 +61,11 @@ impl OpRbuilderConfig { self.builder_private_key = Some(private_key.to_string()); self } + + pub fn with_flashblocks_ws_url(mut self, url: &str) -> Self { + self.flashblocks_ws_url = Some(url.to_string()); + self + } } impl Service for OpRbuilderConfig { @@ -107,6 +113,11 @@ impl Service for OpRbuilderConfig { .arg(http_port.to_string()); } + if let Some(flashblocks_ws_url) = &self.flashblocks_ws_url { + cmd.arg("--rollup.flashblocks-ws-url") + .arg(flashblocks_ws_url); + } + cmd } diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index f983b1513..c54213033 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -76,15 +76,19 @@ use std::{fmt::Display, sync::Arc, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; -#[derive(Debug, Clone, Copy, Default)] +#[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct CustomOpPayloadBuilder { builder_signer: Option, + flashblocks_ws_url: String, } impl CustomOpPayloadBuilder { - pub fn new(builder_signer: Option) -> Self { - Self { builder_signer } + pub fn new(builder_signer: Option, flashblocks_ws_url: String) -> Self { + Self { + builder_signer, + flashblocks_ws_url, + } } } From 07038ea217d40f984d06b6b25ae7578747d6c1d7 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 6 Mar 2025 10:48:34 -0500 Subject: [PATCH 3/5] add notes --- crates/op-rbuilder/README.md | 2 +- crates/op-rbuilder/src/integration/integration_test.rs | 4 ++-- crates/op-rbuilder/src/integration/mod.rs | 2 -- crates/op-rbuilder/src/payload_builder_vanilla.rs | 1 + 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/op-rbuilder/README.md b/crates/op-rbuilder/README.md index a7e0606b6..ba46147ba 100644 --- a/crates/op-rbuilder/README.md +++ b/crates/op-rbuilder/README.md @@ -53,7 +53,7 @@ To run the integration tests, run: cargo run -p op-rbuilder --bin tester --features optimism -- genesis --output genesis.json # Build the op-rbuilder binary -cargo build -p op-rbuilder --bin op-rbuilder --features optimism +cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks # Run the integration tests cargo run -p op-rbuilder --bin tester --features optimism -- run diff --git a/crates/op-rbuilder/src/integration/integration_test.rs b/crates/op-rbuilder/src/integration/integration_test.rs index 1599fbf87..af7574bf5 100644 --- a/crates/op-rbuilder/src/integration/integration_test.rs +++ b/crates/op-rbuilder/src/integration/integration_test.rs @@ -107,7 +107,7 @@ mod tests { } // there must be a line logging the monitoring transaction op_rbuilder - .find_log_line("Committed block built by builder") + .find_log_line("Committed block built by builder") // no builder tx for flashblocks builder .await?; // Wait for specific messages or timeout @@ -254,7 +254,7 @@ mod tests { .transactions .hashes() .any(|hash| hash == *reverting_tx.tx_hash()), - "reverted transaction unexpectedly included in block" + "reverted transaction unexpectedly included in block" // flashblock builder still includes reverted txs ); for hash in block.transactions.hashes() { let receipt = provider diff --git a/crates/op-rbuilder/src/integration/mod.rs b/crates/op-rbuilder/src/integration/mod.rs index f40ee03e8..26a86d354 100644 --- a/crates/op-rbuilder/src/integration/mod.rs +++ b/crates/op-rbuilder/src/integration/mod.rs @@ -130,8 +130,6 @@ impl ServiceInstance { file.read_to_string(&mut contents) .map_err(|_| eyre::eyre!("Failed to read log file"))?; - println!("contents: {:?}", contents); - if contents.contains(pattern) { Ok(()) } else { diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index c54213033..f02d35a2b 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -80,6 +80,7 @@ use tracing::{info, trace, warn}; #[non_exhaustive] pub struct CustomOpPayloadBuilder { builder_signer: Option, + #[allow(dead_code)] flashblocks_ws_url: String, } From f1a1635ab63c1c68979499211a26ce4480c0e4e7 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Fri, 7 Mar 2025 13:28:44 -0500 Subject: [PATCH 4/5] move tests to a separate feature --- .github/workflows/checks.yaml | 8 +- Makefile | 1 + .../src/integration/integration_test.rs | 186 ++++++++++++------ crates/op-rbuilder/src/payload_builder.rs | 21 +- .../src/payload_builder_vanilla.rs | 8 +- 5 files changed, 160 insertions(+), 64 deletions(-) diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml index 2804184b3..c2e303aef 100644 --- a/.github/workflows/checks.yaml +++ b/.github/workflows/checks.yaml @@ -171,12 +171,18 @@ jobs: echo "$(pwd)" >> $GITHUB_PATH - name: Build the rbuilder - run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks + run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism - name: Generate test genesis file run: cargo run -p op-rbuilder --bin tester --features optimism -- genesis --output genesis.json - name: Run integration tests + run: cargo test --package op-rbuilder --lib --features optimism,integration -- integration::integration_test::tests + + - name: Build flashblocks rbuilder + run: cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks + + - name: Run flashblocks builder integration tests run: cargo test --package op-rbuilder --lib --features optimism,integration,flashblocks -- integration::integration_test::tests - name: Aggregate playground logs diff --git a/Makefile b/Makefile index e39178100..99e28aa0e 100644 --- a/Makefile +++ b/Makefile @@ -47,6 +47,7 @@ lint: ## Run the linters test: ## Run the tests for rbuilder and op-rbuilder cargo test --verbose --features "$(FEATURES)" cargo test -p op-rbuilder --verbose --features "$(FEATURES),optimism" + cargo test -p op-rbuilder --verbose --features "$(FEATURES),optimism,flashblocks" .PHONY: lt lt: lint test ## Run "lint" and "test" diff --git a/crates/op-rbuilder/src/integration/integration_test.rs b/crates/op-rbuilder/src/integration/integration_test.rs index af7574bf5..a977ade65 100644 --- a/crates/op-rbuilder/src/integration/integration_test.rs +++ b/crates/op-rbuilder/src/integration/integration_test.rs @@ -26,6 +26,7 @@ mod tests { "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; #[tokio::test] + #[cfg(not(feature = "flashblocks"))] async fn integration_test_chain_produces_blocks() -> eyre::Result<()> { // This is a simple test using the integration framework to test that the chain // produces blocks. @@ -45,8 +46,7 @@ mod tests { .auth_rpc_port(1234) .network_port(1235) .http_port(1238) - .with_builder_private_key(BUILDER_PRIVATE_KEY) - .with_flashblocks_ws_url("localhost:1239"); + .with_builder_private_key(BUILDER_PRIVATE_KEY); // create the validation reth node let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); @@ -63,23 +63,6 @@ mod tests { .await .unwrap(); - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - - // Spawn WebSocket listener task - let ws_handle = tokio::spawn(async move { - let (ws_stream, _) = connect_async("ws://localhost:1239").await?; - let (_, mut read) = ws_stream.split(); - - while let Some(Ok(msg)) = read.next().await { - if let Ok(text) = msg.into_text() { - messages_clone.lock().unwrap().push(text); - } - } - Ok::<_, eyre::Error>(()) - }); - let engine_api = EngineApi::new("http://localhost:1234").unwrap(); let validation_api = EngineApi::new("http://localhost:1236").unwrap(); @@ -105,50 +88,17 @@ mod tests { .expect("receipt"); } } + // there must be a line logging the monitoring transaction op_rbuilder - .find_log_line("Committed block built by builder") // no builder tx for flashblocks builder + .find_log_line("Committed block built by builder") .await?; - // Wait for specific messages or timeout - let timeout_duration = Duration::from_secs(10); - tokio::time::timeout(timeout_duration, async { - let mut message_count = 0; - loop { - if message_count >= 10 { - break; - } - let messages = received_messages.lock().unwrap(); - let messages_json: Vec = messages - .iter() - .map(|msg| serde_json::from_str(msg).unwrap()) - .collect(); - for msg in messages_json.iter() { - let metadata = msg.get("metadata"); - assert!(metadata.is_some(), "metadata field missing"); - let metadata = metadata.unwrap(); - assert!( - metadata.get("block_number").is_some(), - "block_number missing" - ); - assert!( - metadata.get("new_account_balances").is_some(), - "new_account_balances missing" - ); - assert!(metadata.get("receipts").is_some(), "receipts missing"); - message_count += 1; - } - drop(messages); - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await?; - ws_handle.abort(); - Ok(()) } #[tokio::test] + #[cfg(not(feature = "flashblocks"))] async fn integration_test_revert_protection() -> eyre::Result<()> { // This is a simple test using the integration framework to test that the chain // produces blocks. @@ -254,7 +204,7 @@ mod tests { .transactions .hashes() .any(|hash| hash == *reverting_tx.tx_hash()), - "reverted transaction unexpectedly included in block" // flashblock builder still includes reverted txs + "reverted transaction unexpectedly included in block" ); for hash in block.transactions.hashes() { let receipt = provider @@ -272,4 +222,128 @@ mod tests { Ok(()) } + + #[tokio::test] + #[cfg(feature = "flashblocks")] + async fn integration_test_chain_produces_blocks() -> eyre::Result<()> { + // This is a simple test using the integration framework to test that the chain + // produces blocks. + let mut framework = + IntegrationFramework::new("integration_test_chain_produces_blocks").unwrap(); + + // we are going to use a genesis file pre-generated before the test + let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + genesis_path.push("../../genesis.json"); + assert!(genesis_path.exists()); + + // create the builder + let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let op_rbuilder_config = OpRbuilderConfig::new() + .chain_config_path(genesis_path.clone()) + .data_dir(builder_data_dir) + .auth_rpc_port(1234) + .network_port(1235) + .http_port(1238) + .with_builder_private_key(BUILDER_PRIVATE_KEY) + .with_flashblocks_ws_url("localhost:1239"); + + // create the validation reth node + let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let reth = OpRethConfig::new() + .chain_config_path(genesis_path) + .data_dir(reth_data_dir) + .auth_rpc_port(1236) + .network_port(1237); + + framework.start("op-reth", &reth).await.unwrap(); + + let op_rbuilder = framework + .start("op-rbuilder", &op_rbuilder_config) + .await + .unwrap(); + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + + // Spawn WebSocket listener task + let ws_handle = tokio::spawn(async move { + let (ws_stream, _) = connect_async("ws://localhost:1239").await?; + let (_, mut read) = ws_stream.split(); + + while let Some(Ok(msg)) = read.next().await { + if let Ok(text) = msg.into_text() { + messages_clone.lock().unwrap().push(text); + } + } + Ok::<_, eyre::Error>(()) + }); + + let engine_api = EngineApi::new("http://localhost:1234").unwrap(); + let validation_api = EngineApi::new("http://localhost:1236").unwrap(); + + let mut generator = BlockGenerator::new(&engine_api, Some(&validation_api), false, 1, None); + generator.init().await?; + + let provider = ProviderBuilder::::default() + .on_http("http://localhost:1238".parse()?); + + for _ in 0..10 { + let block_hash = generator.generate_block().await?; + + // query the block and the transactions inside the block + let block = provider + .get_block_by_hash(block_hash, BlockTransactionsKind::Hashes) + .await? + .expect("block"); + + for hash in block.transactions.hashes() { + let _ = provider + .get_transaction_receipt(hash) + .await? + .expect("receipt"); + } + } + // there must be a line logging the monitoring transaction + op_rbuilder + .find_log_line("Processing new chain commit") // no builder tx for flashblocks builder + .await?; + + // Wait for specific messages or timeout + let timeout_duration = Duration::from_secs(10); + tokio::time::timeout(timeout_duration, async { + let mut message_count = 0; + loop { + if message_count >= 10 { + break; + } + let messages = received_messages.lock().unwrap(); + let messages_json: Vec = messages + .iter() + .map(|msg| serde_json::from_str(msg).unwrap()) + .collect(); + for msg in messages_json.iter() { + let metadata = msg.get("metadata"); + assert!(metadata.is_some(), "metadata field missing"); + let metadata = metadata.unwrap(); + assert!( + metadata.get("block_number").is_some(), + "block_number missing" + ); + assert!( + metadata.get("new_account_balances").is_some(), + "new_account_balances missing" + ); + assert!(metadata.get("receipts").is_some(), "receipts missing"); + message_count += 1; + } + drop(messages); + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await?; + ws_handle.abort(); + + Ok(()) + } } diff --git a/crates/op-rbuilder/src/payload_builder.rs b/crates/op-rbuilder/src/payload_builder.rs index 2b3719ffc..067b79701 100644 --- a/crates/op-rbuilder/src/payload_builder.rs +++ b/crates/op-rbuilder/src/payload_builder.rs @@ -74,6 +74,15 @@ use tokio::sync::mpsc; use tokio_tungstenite::accept_async; use tokio_tungstenite::WebSocketStream; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +struct FlashblocksMetadata { + receipts: HashMap, + new_account_balances: HashMap, + block_number: u64, +} + #[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct CustomOpPayloadBuilder { @@ -572,11 +581,11 @@ where .filter_map(|(address, account)| account.info.as_ref().map(|info| (*address, info.balance))) .collect::>(); - let metadata = serde_json::json!({ - "receipts": receipts_with_hash, - "new_account_balances": new_account_balances, - "block_number": ctx.parent().number + 1, - }); + let metadata: FlashblocksMetadata = FlashblocksMetadata { + receipts: receipts_with_hash, + new_account_balances, + block_number: ctx.parent().number + 1, + }; // Prepare the flashblocks message let fb_payload = FlashblocksPayloadV1 { @@ -606,7 +615,7 @@ where transactions: new_transactions_encoded, withdrawals: ctx.withdrawals().cloned().unwrap_or_default().to_vec(), }, - metadata, + metadata: serde_json::to_value(&metadata).unwrap_or_default(), }; Ok(( diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index f02d35a2b..602173afc 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -80,17 +80,23 @@ use tracing::{info, trace, warn}; #[non_exhaustive] pub struct CustomOpPayloadBuilder { builder_signer: Option, - #[allow(dead_code)] + #[cfg(feature = "flashblocks")] flashblocks_ws_url: String, } impl CustomOpPayloadBuilder { + #[cfg(feature = "flashblocks")] pub fn new(builder_signer: Option, flashblocks_ws_url: String) -> Self { Self { builder_signer, flashblocks_ws_url, } } + + #[cfg(not(feature = "flashblocks"))] + pub fn new(builder_signer: Option, _flashblocks_ws_url: String) -> Self { + Self { builder_signer } + } } impl PayloadServiceBuilder for CustomOpPayloadBuilder From 9c17662db52e0e0ece23577f7aab3f9f39d2e474 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Fri, 7 Mar 2025 13:34:30 -0500 Subject: [PATCH 5/5] tweaks --- crates/op-rbuilder/README.md | 2 +- crates/op-rbuilder/src/integration/integration_test.rs | 2 +- crates/op-rbuilder/src/payload_builder.rs | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/op-rbuilder/README.md b/crates/op-rbuilder/README.md index ba46147ba..a7e0606b6 100644 --- a/crates/op-rbuilder/README.md +++ b/crates/op-rbuilder/README.md @@ -53,7 +53,7 @@ To run the integration tests, run: cargo run -p op-rbuilder --bin tester --features optimism -- genesis --output genesis.json # Build the op-rbuilder binary -cargo build -p op-rbuilder --bin op-rbuilder --features optimism,flashblocks +cargo build -p op-rbuilder --bin op-rbuilder --features optimism # Run the integration tests cargo run -p op-rbuilder --bin tester --features optimism -- run diff --git a/crates/op-rbuilder/src/integration/integration_test.rs b/crates/op-rbuilder/src/integration/integration_test.rs index a977ade65..05e6163b7 100644 --- a/crates/op-rbuilder/src/integration/integration_test.rs +++ b/crates/op-rbuilder/src/integration/integration_test.rs @@ -309,7 +309,7 @@ mod tests { .find_log_line("Processing new chain commit") // no builder tx for flashblocks builder .await?; - // Wait for specific messages or timeout + // Process websocket messages let timeout_duration = Duration::from_secs(10); tokio::time::timeout(timeout_duration, async { let mut message_count = 0; diff --git a/crates/op-rbuilder/src/payload_builder.rs b/crates/op-rbuilder/src/payload_builder.rs index 067b79701..e6ecf074a 100644 --- a/crates/op-rbuilder/src/payload_builder.rs +++ b/crates/op-rbuilder/src/payload_builder.rs @@ -569,7 +569,6 @@ where .collect::>(); let new_receipts = info.receipts[info.last_flashblock_index..].to_vec(); - let receipts_with_hash = new_transactions .iter() .zip(new_receipts.iter())