From c46bb168e221c2b961a63b0107cbf6a95ffc5348 Mon Sep 17 00:00:00 2001 From: George Knee Date: Mon, 3 Mar 2025 13:45:54 +0000 Subject: [PATCH] op-batcher: introduce `PREFER_LOCAL_SAFE_L2` config var (#14587) * op-batcher: introduce PREFER_LOCAL_SAFE_L2 config var * lint * Apply suggestions from code review Co-authored-by: Sebastian Stammler * lint --------- Co-authored-by: Sebastian Stammler --- op-batcher/batcher/config.go | 4 + op-batcher/batcher/driver.go | 2 +- op-batcher/batcher/service.go | 4 + op-batcher/batcher/sync_actions.go | 30 +++-- op-batcher/batcher/sync_actions_test.go | 142 ++++++++++++++++-------- op-batcher/flags/flags.go | 7 ++ 6 files changed, 134 insertions(+), 55 deletions(-) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index c712b74bc8e5..85919a56114e 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -110,6 +110,9 @@ type CLIConfig struct { // ThrottleAlwaysBlockSize is the total per-block DA limit to always imposing on block building. ThrottleAlwaysBlockSize uint64 + // PreferLocalSafeL2 triggers the batcher to load blocks from the sequencer based on the LocalSafeL2 SyncStatus field (instead of the SafeL2 field). + PreferLocalSafeL2 bool + // TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize. // Should only be used for testing purposes. TestUseMaxTxSizeForBlobs bool @@ -215,5 +218,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name), ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name), ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name), + PreferLocalSafeL2: ctx.Bool(flags.PreferLocalSafeL2Flag.Name), } } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 3bbeb5f17063..4ff50753a48b 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -422,7 +422,7 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc defer l.channelMgrMutex.Unlock() // Decide appropriate actions - syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log) + syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log, l.Config.PreferLocalSafeL2) if outOfSync { // If the sequencer is out of sync diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index dbf826b034c6..4c2d9c8ed48d 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -48,6 +48,8 @@ type BatcherConfig struct { // For throttling DA. See CLIConfig in config.go for details on these parameters. ThrottleThreshold, ThrottleTxSize uint64 ThrottleBlockSize, ThrottleAlwaysBlockSize uint64 + + PreferLocalSafeL2 bool } // BatcherService represents a full batch-submitter instance and its resources, @@ -111,6 +113,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, bs.ThrottleBlockSize = cfg.ThrottleBlockSize bs.ThrottleAlwaysBlockSize = cfg.ThrottleAlwaysBlockSize + bs.PreferLocalSafeL2 = cfg.PreferLocalSafeL2 + optsFromRPC, err := bs.initRPCClients(ctx, cfg) if err != nil { return err diff --git a/op-batcher/batcher/sync_actions.go b/op-batcher/batcher/sync_actions.go index 232ab5a86ef3..a6645e6ae74b 100644 --- a/op-batcher/batcher/sync_actions.go +++ b/op-batcher/batcher/sync_actions.go @@ -47,15 +47,29 @@ func (s syncActions) TerminalString() string { // state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned // in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block // range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync. -func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) { +func computeSyncActions[T channelStatuser]( + newSyncStatus eth.SyncStatus, + prevCurrentL1 eth.L1BlockRef, + blocks queue.Queue[*types.Block], + channels []T, + l log.Logger, + preferLocalSafeL2 bool, +) (syncActions, bool) { m := l.With( "syncStatus.headL1", newSyncStatus.HeadL1, "syncStatus.currentL1", newSyncStatus.CurrentL1, "syncStatus.localSafeL2", newSyncStatus.LocalSafeL2, + "syncStatus.safeL2", newSyncStatus.SafeL2, "syncStatus.unsafeL2", newSyncStatus.UnsafeL2, ) + safeL2 := newSyncStatus.SafeL2 + if preferLocalSafeL2 { + // This is preffered when running interop, but not yet enabled by default. + safeL2 = newSyncStatus.LocalSafeL2 + } + // PART 1: Initial checks on the sync status if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) { m.Warn("empty sync status") @@ -69,8 +83,8 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur } var allUnsafeBlocks *inclusiveBlockRange - if newSyncStatus.UnsafeL2.Number > newSyncStatus.LocalSafeL2.Number { - allUnsafeBlocks = &inclusiveBlockRange{newSyncStatus.LocalSafeL2.Number + 1, newSyncStatus.UnsafeL2.Number} + if newSyncStatus.UnsafeL2.Number > safeL2.Number { + allUnsafeBlocks = &inclusiveBlockRange{safeL2.Number + 1, newSyncStatus.UnsafeL2.Number} } // PART 2: checks involving only the oldest block in the state @@ -89,12 +103,12 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur // and we need to start over, loading all unsafe blocks // from the sequencer. startAfresh := syncActions{ - clearState: &newSyncStatus.LocalSafeL2.L1Origin, + clearState: &safeL2.L1Origin, blocksToLoad: allUnsafeBlocks, } oldestBlockInStateNum := oldestBlockInState.NumberU64() - nextSafeBlockNum := newSyncStatus.LocalSafeL2.Number + 1 + nextSafeBlockNum := safeL2.Number + 1 if nextSafeBlockNum < oldestBlockInStateNum { m.Warn("next safe block is below oldest block in state", @@ -120,7 +134,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur return startAfresh, false } - if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != newSyncStatus.LocalSafeL2.Hash { + if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != safeL2.Hash { m.Warn("safe chain reorg, clearing channel manager state", "syncActions", startAfresh, "existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1])) @@ -132,7 +146,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur if ch.isFullySubmitted() && !ch.isTimedOut() && newSyncStatus.CurrentL1.Number > ch.MaxInclusionBlock() && - newSyncStatus.LocalSafeL2.Number < ch.LatestL2().Number { + safeL2.Number < ch.LatestL2().Number { // Safe head did not make the expected progress // for a fully submitted channel. This indicates // that the derivation pipeline may have stalled @@ -147,7 +161,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur // PART 5: happy path numChannelsToPrune := 0 for _, ch := range channels { - if ch.LatestL2().Number > newSyncStatus.LocalSafeL2.Number { + if ch.LatestL2().Number > safeL2.Number { // If the channel has blocks which are not yet safe // we do not want to prune it. break diff --git a/op-batcher/batcher/sync_actions_test.go b/op-batcher/batcher/sync_actions_test.go index 365b4022a663..ff5ca7f2d2d3 100644 --- a/op-batcher/batcher/sync_actions_test.go +++ b/op-batcher/batcher/sync_actions_test.go @@ -56,6 +56,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { } happyCaseLogs := []string{"computed sync actions"} + noBlocksLogs := []string{"no blocks in state"} type TestCase struct { name string @@ -68,6 +69,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { expected syncActions expectedSeqOutOfSync bool expectedLogs []string + preferLocalSafeL2 bool } testCases := []TestCase{ @@ -95,10 +97,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { // although the sequencer has derived up the same // L1 block height, it derived fewer safe L2 blocks. newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 6}, - CurrentL1: eth.BlockRef{Number: 1}, - LocalSafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 6}, + CurrentL1: eth.BlockRef{Number: 1}, + SafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block102, block103}, // note absence of block101 @@ -113,10 +115,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { // This can happen if another batcher instance got some blocks // included in the safe chain: newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 6}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 6}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103}, @@ -131,10 +133,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { // This can happen if there is an L1 reorg, the safe chain is at an acceptable // height but it does not descend from the blocks in state: newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103}, @@ -149,10 +151,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { // This could happen if the batcher unexpectedly violates the // Holocene derivation rules: newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 3}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 3}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103}, @@ -166,10 +168,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { {name: "failed to make expected progress (unsafe=safe)", // Edge case where unsafe = safe newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 3}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, - UnsafeL2: eth.L2BlockRef{Number: 101}, + HeadL1: eth.BlockRef{Number: 3}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 101}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block102, block103}, @@ -185,10 +187,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { // and we didn't submit or have any txs confirmed since // the last sync. newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 4}, - CurrentL1: eth.BlockRef{Number: 1}, - LocalSafeL2: eth.L2BlockRef{Number: 100}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 4}, + CurrentL1: eth.BlockRef{Number: 1}, + SafeL2: eth.L2BlockRef{Number: 100}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103}, @@ -201,10 +203,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { {name: "no blocks", // This happens when the batcher is starting up for the first time newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{}, @@ -217,10 +219,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { {name: "happy path", // This happens when the safe chain is being progressed as expected: newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103}, @@ -234,10 +236,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { }, {name: "happy path + multiple channels", newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, - UnsafeL2: eth.L2BlockRef{Number: 109}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101, block102, block103, block104}, @@ -251,23 +253,23 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { }, {name: "no progress + unsafe=safe", newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 100}, - UnsafeL2: eth.L2BlockRef{Number: 100}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 100}, + UnsafeL2: eth.L2BlockRef{Number: 100}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{}, channels: []channelStatuser{}, expected: syncActions{}, - expectedLogs: []string{"no blocks in state"}, + expectedLogs: noBlocksLogs, }, {name: "no progress + unsafe=safe + blocks in state", newSyncStatus: eth.SyncStatus{ - HeadL1: eth.BlockRef{Number: 5}, - CurrentL1: eth.BlockRef{Number: 2}, - LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()}, - UnsafeL2: eth.L2BlockRef{Number: 101}, + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 101}, }, prevCurrentL1: eth.BlockRef{Number: 1}, blocks: queue.Queue[*types.Block]{block101}, @@ -277,6 +279,54 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { }, expectedLogs: happyCaseLogs, }, + {name: "localSafeL2 > safeL2, preferLocalSafeL2=false", + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{}, + channels: []channelStatuser{}, + expected: syncActions{ + blocksToLoad: &inclusiveBlockRange{104, 109}, + }, + expectedLogs: noBlocksLogs, + }, + {name: "localSafeL2 > safeL2, preferLocalSafeL2=true", + preferLocalSafeL2: true, + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{}, + channels: []channelStatuser{}, + expected: syncActions{ + blocksToLoad: &inclusiveBlockRange{105, 109}, + }, + expectedLogs: noBlocksLogs, + }, + {name: "LocalSafeL2=0,SafeL2>0", // This shouldn't ever happen, but has occurred due to bugs + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{}, + channels: []channelStatuser{}, + expected: syncActions{ + blocksToLoad: &inclusiveBlockRange{105, 109}, + }, + expectedLogs: noBlocksLogs, + }, } for _, tc := range testCases { @@ -285,7 +335,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) { l, h := testlog.CaptureLogger(t, log.LevelDebug) result, outOfSync := computeSyncActions( - tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l, + tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l, tc.preferLocalSafeL2, ) require.Equal(t, tc.expected, result, "unexpected actions") diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 843122e13c1f..d62e12ca680a 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -180,6 +180,12 @@ var ( Value: 130_000, // should be larger than the builder's max-l2-tx-size to prevent endlessly throttling some txs EnvVars: prefixEnvVars("THROTTLE_ALWAYS_BLOCK_SIZE"), } + PreferLocalSafeL2Flag = &cli.BoolFlag{ + Name: "prefer-local-safe-l2", + Usage: "Load unsafe blocks higher than the sequencer's LocalSafeL2 instead of SafeL2", + Value: false, + EnvVars: prefixEnvVars("PREFER_LOCAL_SAFE_L2"), + } // Legacy Flags SequencerHDPathFlag = txmgr.SequencerHDPathFlag ) @@ -212,6 +218,7 @@ var optionalFlags = []cli.Flag{ ThrottleTxSizeFlag, ThrottleBlockSizeFlag, ThrottleAlwaysBlockSizeFlag, + PreferLocalSafeL2Flag, } func init() {