Skip to content

Commit

Permalink
op-batcher: introduce PREFER_LOCAL_SAFE_L2 config var (#14587)
Browse files Browse the repository at this point in the history
* op-batcher: introduce PREFER_LOCAL_SAFE_L2 config var

* lint

* Apply suggestions from code review

Co-authored-by: Sebastian Stammler <seb@oplabs.co>

* lint

---------

Co-authored-by: Sebastian Stammler <seb@oplabs.co>
  • Loading branch information
geoknee and sebastianst authored Mar 3, 2025
1 parent d8b84ae commit c46bb16
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 55 deletions.
4 changes: 4 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type CLIConfig struct {
// ThrottleAlwaysBlockSize is the total per-block DA limit to always imposing on block building.
ThrottleAlwaysBlockSize uint64

// PreferLocalSafeL2 triggers the batcher to load blocks from the sequencer based on the LocalSafeL2 SyncStatus field (instead of the SafeL2 field).
PreferLocalSafeL2 bool

// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
Expand Down Expand Up @@ -215,5 +218,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
PreferLocalSafeL2: ctx.Bool(flags.PreferLocalSafeL2Flag.Name),
}
}
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc
defer l.channelMgrMutex.Unlock()

// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log)
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log, l.Config.PreferLocalSafeL2)

if outOfSync {
// If the sequencer is out of sync
Expand Down
4 changes: 4 additions & 0 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type BatcherConfig struct {
// For throttling DA. See CLIConfig in config.go for details on these parameters.
ThrottleThreshold, ThrottleTxSize uint64
ThrottleBlockSize, ThrottleAlwaysBlockSize uint64

PreferLocalSafeL2 bool
}

// BatcherService represents a full batch-submitter instance and its resources,
Expand Down Expand Up @@ -111,6 +113,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.ThrottleBlockSize = cfg.ThrottleBlockSize
bs.ThrottleAlwaysBlockSize = cfg.ThrottleAlwaysBlockSize

bs.PreferLocalSafeL2 = cfg.PreferLocalSafeL2

optsFromRPC, err := bs.initRPCClients(ctx, cfg)
if err != nil {
return err
Expand Down
30 changes: 22 additions & 8 deletions op-batcher/batcher/sync_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,29 @@ func (s syncActions) TerminalString() string {
// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned
// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block
// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync.
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) {
func computeSyncActions[T channelStatuser](
newSyncStatus eth.SyncStatus,
prevCurrentL1 eth.L1BlockRef,
blocks queue.Queue[*types.Block],
channels []T,
l log.Logger,
preferLocalSafeL2 bool,
) (syncActions, bool) {

m := l.With(
"syncStatus.headL1", newSyncStatus.HeadL1,
"syncStatus.currentL1", newSyncStatus.CurrentL1,
"syncStatus.localSafeL2", newSyncStatus.LocalSafeL2,
"syncStatus.safeL2", newSyncStatus.SafeL2,
"syncStatus.unsafeL2", newSyncStatus.UnsafeL2,
)

safeL2 := newSyncStatus.SafeL2
if preferLocalSafeL2 {
// This is preffered when running interop, but not yet enabled by default.
safeL2 = newSyncStatus.LocalSafeL2
}

// PART 1: Initial checks on the sync status
if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) {
m.Warn("empty sync status")
Expand All @@ -69,8 +83,8 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
}

var allUnsafeBlocks *inclusiveBlockRange
if newSyncStatus.UnsafeL2.Number > newSyncStatus.LocalSafeL2.Number {
allUnsafeBlocks = &inclusiveBlockRange{newSyncStatus.LocalSafeL2.Number + 1, newSyncStatus.UnsafeL2.Number}
if newSyncStatus.UnsafeL2.Number > safeL2.Number {
allUnsafeBlocks = &inclusiveBlockRange{safeL2.Number + 1, newSyncStatus.UnsafeL2.Number}
}

// PART 2: checks involving only the oldest block in the state
Expand All @@ -89,12 +103,12 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
// and we need to start over, loading all unsafe blocks
// from the sequencer.
startAfresh := syncActions{
clearState: &newSyncStatus.LocalSafeL2.L1Origin,
clearState: &safeL2.L1Origin,
blocksToLoad: allUnsafeBlocks,
}

oldestBlockInStateNum := oldestBlockInState.NumberU64()
nextSafeBlockNum := newSyncStatus.LocalSafeL2.Number + 1
nextSafeBlockNum := safeL2.Number + 1

if nextSafeBlockNum < oldestBlockInStateNum {
m.Warn("next safe block is below oldest block in state",
Expand All @@ -120,7 +134,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
return startAfresh, false
}

if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != newSyncStatus.LocalSafeL2.Hash {
if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != safeL2.Hash {
m.Warn("safe chain reorg, clearing channel manager state",
"syncActions", startAfresh,
"existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]))
Expand All @@ -132,7 +146,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
if ch.isFullySubmitted() &&
!ch.isTimedOut() &&
newSyncStatus.CurrentL1.Number > ch.MaxInclusionBlock() &&
newSyncStatus.LocalSafeL2.Number < ch.LatestL2().Number {
safeL2.Number < ch.LatestL2().Number {
// Safe head did not make the expected progress
// for a fully submitted channel. This indicates
// that the derivation pipeline may have stalled
Expand All @@ -147,7 +161,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
// PART 5: happy path
numChannelsToPrune := 0
for _, ch := range channels {
if ch.LatestL2().Number > newSyncStatus.LocalSafeL2.Number {
if ch.LatestL2().Number > safeL2.Number {
// If the channel has blocks which are not yet safe
// we do not want to prune it.
break
Expand Down
142 changes: 96 additions & 46 deletions op-batcher/batcher/sync_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
}

happyCaseLogs := []string{"computed sync actions"}
noBlocksLogs := []string{"no blocks in state"}

type TestCase struct {
name string
Expand All @@ -68,6 +69,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
expected syncActions
expectedSeqOutOfSync bool
expectedLogs []string
preferLocalSafeL2 bool
}

testCases := []TestCase{
Expand Down Expand Up @@ -95,10 +97,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// although the sequencer has derived up the same
// L1 block height, it derived fewer safe L2 blocks.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 1},
LocalSafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block102, block103}, // note absence of block101
Expand All @@ -113,10 +115,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This can happen if another batcher instance got some blocks
// included in the safe chain:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -131,10 +133,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This can happen if there is an L1 reorg, the safe chain is at an acceptable
// height but it does not descend from the blocks in state:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -149,10 +151,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This could happen if the batcher unexpectedly violates the
// Holocene derivation rules:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -166,10 +168,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "failed to make expected progress (unsafe=safe)",
// Edge case where unsafe = safe
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 101},
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 101},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block102, block103},
Expand All @@ -185,10 +187,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// and we didn't submit or have any txs confirmed since
// the last sync.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 4},
CurrentL1: eth.BlockRef{Number: 1},
LocalSafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 4},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -201,10 +203,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "no blocks",
// This happens when the batcher is starting up for the first time
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
Expand All @@ -217,10 +219,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "happy path",
// This happens when the safe chain is being progressed as expected:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -234,10 +236,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
{name: "happy path + multiple channels",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103, block104},
Expand All @@ -251,23 +253,23 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
{name: "no progress + unsafe=safe",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 100},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 100},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{},
expectedLogs: []string{"no blocks in state"},
expectedLogs: noBlocksLogs,
},
{name: "no progress + unsafe=safe + blocks in state",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 101},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 101},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101},
Expand All @@ -277,6 +279,54 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
expectedLogs: happyCaseLogs,
},
{name: "localSafeL2 > safeL2, preferLocalSafeL2=false",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{104, 109},
},
expectedLogs: noBlocksLogs,
},
{name: "localSafeL2 > safeL2, preferLocalSafeL2=true",
preferLocalSafeL2: true,
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{105, 109},
},
expectedLogs: noBlocksLogs,
},
{name: "LocalSafeL2=0,SafeL2>0", // This shouldn't ever happen, but has occurred due to bugs
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{105, 109},
},
expectedLogs: noBlocksLogs,
},
}

for _, tc := range testCases {
Expand All @@ -285,7 +335,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
l, h := testlog.CaptureLogger(t, log.LevelDebug)

result, outOfSync := computeSyncActions(
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l,
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l, tc.preferLocalSafeL2,
)

require.Equal(t, tc.expected, result, "unexpected actions")
Expand Down
7 changes: 7 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ var (
Value: 130_000, // should be larger than the builder's max-l2-tx-size to prevent endlessly throttling some txs
EnvVars: prefixEnvVars("THROTTLE_ALWAYS_BLOCK_SIZE"),
}
PreferLocalSafeL2Flag = &cli.BoolFlag{
Name: "prefer-local-safe-l2",
Usage: "Load unsafe blocks higher than the sequencer's LocalSafeL2 instead of SafeL2",
Value: false,
EnvVars: prefixEnvVars("PREFER_LOCAL_SAFE_L2"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand Down Expand Up @@ -212,6 +218,7 @@ var optionalFlags = []cli.Flag{
ThrottleTxSizeFlag,
ThrottleBlockSizeFlag,
ThrottleAlwaysBlockSizeFlag,
PreferLocalSafeL2Flag,
}

func init() {
Expand Down

0 comments on commit c46bb16

Please sign in to comment.