Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: introduce PREFER_LOCAL_SAFE_L2 config var #14587

Merged
merged 4 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type CLIConfig struct {
// ThrottleAlwaysBlockSize is the total per-block DA limit to always imposing on block building.
ThrottleAlwaysBlockSize uint64

// PreferLocalSafeL2 triggers the batcher to load blocks from the sequencer based on the LocalSafeL2 SyncStatus field (instead of the SafeL2 field).
PreferLocalSafeL2 bool

// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
Expand Down Expand Up @@ -215,5 +218,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
PreferLocalSafeL2: ctx.Bool(flags.PreferLocalSafeL2Flag.Name),
}
}
2 changes: 1 addition & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc
defer l.channelMgrMutex.Unlock()

// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log)
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log, l.Config.PreferLocalSafeL2)

if outOfSync {
// If the sequencer is out of sync
Expand Down
4 changes: 4 additions & 0 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type BatcherConfig struct {
// For throttling DA. See CLIConfig in config.go for details on these parameters.
ThrottleThreshold, ThrottleTxSize uint64
ThrottleBlockSize, ThrottleAlwaysBlockSize uint64

PreferLocalSafeL2 bool
}

// BatcherService represents a full batch-submitter instance and its resources,
Expand Down Expand Up @@ -111,6 +113,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.ThrottleBlockSize = cfg.ThrottleBlockSize
bs.ThrottleAlwaysBlockSize = cfg.ThrottleAlwaysBlockSize

bs.PreferLocalSafeL2 = cfg.PreferLocalSafeL2

optsFromRPC, err := bs.initRPCClients(ctx, cfg)
if err != nil {
return err
Expand Down
30 changes: 22 additions & 8 deletions op-batcher/batcher/sync_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,29 @@ func (s syncActions) TerminalString() string {
// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned
// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block
// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync.
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) {
func computeSyncActions[T channelStatuser](
newSyncStatus eth.SyncStatus,
prevCurrentL1 eth.L1BlockRef,
blocks queue.Queue[*types.Block],
channels []T,
l log.Logger,
preferLocalSafeL2 bool,
) (syncActions, bool) {

m := l.With(
"syncStatus.headL1", newSyncStatus.HeadL1,
"syncStatus.currentL1", newSyncStatus.CurrentL1,
"syncStatus.localSafeL2", newSyncStatus.LocalSafeL2,
"syncStatus.safeL2", newSyncStatus.SafeL2,
"syncStatus.unsafeL2", newSyncStatus.UnsafeL2,
)

safeL2 := newSyncStatus.SafeL2
if preferLocalSafeL2 {
// This is preffered when running interop, but not yet enabled by default.
safeL2 = newSyncStatus.LocalSafeL2
}

// PART 1: Initial checks on the sync status
if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) {
m.Warn("empty sync status")
Expand All @@ -69,8 +83,8 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
}

var allUnsafeBlocks *inclusiveBlockRange
if newSyncStatus.UnsafeL2.Number > newSyncStatus.LocalSafeL2.Number {
allUnsafeBlocks = &inclusiveBlockRange{newSyncStatus.LocalSafeL2.Number + 1, newSyncStatus.UnsafeL2.Number}
if newSyncStatus.UnsafeL2.Number > safeL2.Number {
allUnsafeBlocks = &inclusiveBlockRange{safeL2.Number + 1, newSyncStatus.UnsafeL2.Number}
}

// PART 2: checks involving only the oldest block in the state
Expand All @@ -89,12 +103,12 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
// and we need to start over, loading all unsafe blocks
// from the sequencer.
startAfresh := syncActions{
clearState: &newSyncStatus.LocalSafeL2.L1Origin,
clearState: &safeL2.L1Origin,
blocksToLoad: allUnsafeBlocks,
}

oldestBlockInStateNum := oldestBlockInState.NumberU64()
nextSafeBlockNum := newSyncStatus.LocalSafeL2.Number + 1
nextSafeBlockNum := safeL2.Number + 1

if nextSafeBlockNum < oldestBlockInStateNum {
m.Warn("next safe block is below oldest block in state",
Expand All @@ -120,7 +134,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
return startAfresh, false
}

if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != newSyncStatus.LocalSafeL2.Hash {
if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != safeL2.Hash {
m.Warn("safe chain reorg, clearing channel manager state",
"syncActions", startAfresh,
"existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]))
Expand All @@ -132,7 +146,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
if ch.isFullySubmitted() &&
!ch.isTimedOut() &&
newSyncStatus.CurrentL1.Number > ch.MaxInclusionBlock() &&
newSyncStatus.LocalSafeL2.Number < ch.LatestL2().Number {
safeL2.Number < ch.LatestL2().Number {
// Safe head did not make the expected progress
// for a fully submitted channel. This indicates
// that the derivation pipeline may have stalled
Expand All @@ -147,7 +161,7 @@ func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCur
// PART 5: happy path
numChannelsToPrune := 0
for _, ch := range channels {
if ch.LatestL2().Number > newSyncStatus.LocalSafeL2.Number {
if ch.LatestL2().Number > safeL2.Number {
// If the channel has blocks which are not yet safe
// we do not want to prune it.
break
Expand Down
143 changes: 97 additions & 46 deletions op-batcher/batcher/sync_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
}

happyCaseLogs := []string{"computed sync actions"}
noBlocksLogs := []string{"no blocks in state"}

type TestCase struct {
name string
Expand All @@ -68,6 +69,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
expected syncActions
expectedSeqOutOfSync bool
expectedLogs []string
preferLocalSafeL2 bool
}

testCases := []TestCase{
Expand Down Expand Up @@ -95,10 +97,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// although the sequencer has derived up the same
// L1 block height, it derived fewer safe L2 blocks.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 1},
LocalSafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block102, block103}, // note absence of block101
Expand All @@ -113,10 +115,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This can happen if another batcher instance got some blocks
// included in the safe chain:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -131,10 +133,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This can happen if there is an L1 reorg, the safe chain is at an acceptable
// height but it does not descend from the blocks in state:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -149,10 +151,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// This could happen if the batcher unexpectedly violates the
// Holocene derivation rules:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -166,10 +168,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "failed to make expected progress (unsafe=safe)",
// Edge case where unsafe = safe
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 101},
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 101},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block102, block103},
Expand All @@ -185,10 +187,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
// and we didn't submit or have any txs confirmed since
// the last sync.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 4},
CurrentL1: eth.BlockRef{Number: 1},
LocalSafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 4},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -201,10 +203,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "no blocks",
// This happens when the batcher is starting up for the first time
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
Expand All @@ -217,10 +219,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
{name: "happy path",
// This happens when the safe chain is being progressed as expected:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
Expand All @@ -234,10 +236,10 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
{name: "happy path + multiple channels",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103, block104},
Expand All @@ -251,23 +253,23 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
{name: "no progress + unsafe=safe",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 100},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 100},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{},
expectedLogs: []string{"no blocks in state"},
expectedLogs: noBlocksLogs,
},
{name: "no progress + unsafe=safe + blocks in state",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
LocalSafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 101},
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 101},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101},
Expand All @@ -277,6 +279,55 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
},
expectedLogs: happyCaseLogs,
},
{name: "localSafeL2 > safeL2, preferLocalSafeL2=false",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{104, 109},
},
expectedLogs: noBlocksLogs,
},
{name: "localSafeL2 > safeL2, preferLocalSafeL2=true",
preferLocalSafeL2: true,
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
LocalSafeL2: eth.L2BlockRef{Number: 104, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{105, 109},
},
expectedLogs: noBlocksLogs,
},
{name: "LocalSafeL2=0,SafeL2>0", // This shouldn't ever happen, but has occurred due to bugs
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 104, Hash: block103.Hash()},
LocalSafeL2: eth.L2BlockRef{Number: 0, Hash: block104.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{105, 109},
},
expectedLogs: noBlocksLogs,
},
}

for _, tc := range testCases {
Expand All @@ -285,7 +336,7 @@ func TestBatchSubmitter_computeSyncActions(t *testing.T) {
l, h := testlog.CaptureLogger(t, log.LevelDebug)

result, outOfSync := computeSyncActions(
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l,
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l, tc.preferLocalSafeL2,
)

require.Equal(t, tc.expected, result, "unexpected actions")
Expand Down
7 changes: 7 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ var (
Value: 130_000, // should be larger than the builder's max-l2-tx-size to prevent endlessly throttling some txs
EnvVars: prefixEnvVars("THROTTLE_ALWAYS_BLOCK_SIZE"),
}
PreferLocalSafeL2Flag = &cli.BoolFlag{
Name: "prefer-local-safe-l2",
Usage: "Load unsafe blocks higher than the sequencer's LocalSafeL2 instead of SafeL2",
Value: false,
EnvVars: prefixEnvVars("PREFER_LOCAL_SAFE_L2"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand Down Expand Up @@ -212,6 +218,7 @@ var optionalFlags = []cli.Flag{
ThrottleTxSizeFlag,
ThrottleBlockSizeFlag,
ThrottleAlwaysBlockSizeFlag,
PreferLocalSafeL2Flag,
}

func init() {
Expand Down