From d6c69f1df9d8aabd561cbec7423bdebf9fb1a258 Mon Sep 17 00:00:00 2001 From: Karl Bartel Date: Tue, 13 Aug 2024 10:55:25 +0200 Subject: [PATCH 1/4] Write to DA in goroutine Slow writes to DA (e.g. writing to EigenDA proxy) will block the main loop. To avoid this, the DA writes are moved to a goroutine managed by an errgroup. If one write fails, all writes within the same publishStateToL1 call are aborted, which is similar to the previous error handling. sendTransaction calls (which write to the DA not the L1, despite the naming) communicate through Queue.Send and BatchSubmitter.recordFailedTx, so moving it to a goroutine works fine. --- op-batcher/batcher/driver.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index b01bb51484c7..37ff1b095a8f 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" ) var ErrBatcherNotRunning = errors.New("batcher is not running") @@ -373,13 +374,22 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { + errGroup, ctx := errgroup.WithContext(l.killCtx) for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(l.killCtx, queue, receiptsCh) + // if one of the l.sendTransaction calls failed, we stop writing to DA + select { + case <-ctx.Done(): + err := errGroup.Wait() + l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) + return + default: + } + err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -429,7 +439,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -449,9 +459,9 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil { - return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) - } + errGroup.Go(func() error { + return l.sendTransaction(ctx, txdata, queue, receiptsCh) + }) return nil } From eba6f8675c30718195a2f5c29a326d325074c1cc Mon Sep 17 00:00:00 2001 From: Karl Bartel Date: Tue, 13 Aug 2024 11:22:26 +0200 Subject: [PATCH 2/4] Wait for DA on reorgs/shutdown publishAndWait is meant to wait until all remaining data is written. Only waiting to the txs in the queue won't work, since those are only created after the data has been written to the DA. --- op-batcher/batcher/driver.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 37ff1b095a8f..1e9e9ab28003 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -269,6 +269,7 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txID]) queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) + var daWaitGroup sync.WaitGroup // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -290,8 +291,11 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1(queue, receiptsCh, &daWaitGroup) if !l.Txmgr.IsClosed() { + l.Log.Info("Wait for pure DA writes, not L1 txs") + daWaitGroup.Wait() + l.Log.Info("Wait for L1 writes (blobs or DA commitments)") queue.Wait() } else { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -316,7 +320,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1(queue, receiptsCh, &daWaitGroup) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -373,7 +377,7 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) { errGroup, ctx := errgroup.WithContext(l.killCtx) for { // if the txmgr is closed, we stop the transaction sending @@ -389,7 +393,7 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh c return default: } - err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup) + err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup, daWaitGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -439,7 +443,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group, daWaitGroup *sync.WaitGroup) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -459,7 +463,9 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } + daWaitGroup.Add(1) errGroup.Go(func() error { + defer daWaitGroup.Done() return l.sendTransaction(ctx, txdata, queue, receiptsCh) }) return nil From 7be9273888c772a640d7cb78b816f19318d24d04 Mon Sep 17 00:00:00 2001 From: Karl Bartel Date: Wed, 21 Aug 2024 13:04:48 +0200 Subject: [PATCH 3/4] Merge daWaitGroup and errGroup There were overlapping and it was nit clear why an error should only cancel goroutines from a single publishStateToL1 call. It also allows limiting the total number of goroutines spawned that way (although this introduces same blocking possibility again). --- op-batcher/batcher/driver.go | 43 +++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 1e9e9ab28003..8410113fd418 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -269,7 +269,15 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txID]) queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) - var daWaitGroup sync.WaitGroup + var ( + daErrGroup *errgroup.Group + daErrGroupCtx context.Context + ) + resetDaErrGroup := func() { + daErrGroup, daErrGroupCtx = errgroup.WithContext(l.killCtx) + daErrGroup.SetLimit(10) // TODO: make this configurable + } + resetDaErrGroup() // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -291,12 +299,13 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh, &daWaitGroup) + l.publishStateToL1(daErrGroupCtx, queue, receiptsCh, daErrGroup) if !l.Txmgr.IsClosed() { l.Log.Info("Wait for pure DA writes, not L1 txs") - daWaitGroup.Wait() + _ = daErrGroup.Wait() l.Log.Info("Wait for L1 writes (blobs or DA commitments)") queue.Wait() + resetDaErrGroup() } else { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") } @@ -320,7 +329,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh, &daWaitGroup) + l.publishStateToL1(daErrGroupCtx, queue, receiptsCh, daErrGroup) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -340,6 +349,15 @@ func (l *BatchSubmitter) loop() { l.Log.Info("Finished publishing all remaining channel data") return } + select { + case <-daErrGroupCtx.Done(): + err := daErrGroup.Wait() + if err != nil { + l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) + } + resetDaErrGroup() + default: + } } } @@ -377,23 +395,14 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) { - errGroup, ctx := errgroup.WithContext(l.killCtx) +func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daErrGroup *errgroup.Group) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - // if one of the l.sendTransaction calls failed, we stop writing to DA - select { - case <-ctx.Done(): - err := errGroup.Wait() - l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) - return - default: - } - err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup, daWaitGroup) + err := l.publishTxToL1(ctx, queue, receiptsCh, daErrGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -443,7 +452,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group, daWaitGroup *sync.WaitGroup) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -463,9 +472,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - daWaitGroup.Add(1) errGroup.Go(func() error { - defer daWaitGroup.Done() return l.sendTransaction(ctx, txdata, queue, receiptsCh) }) return nil From 2a52308ac9d6a3a002ab8f2e39132ff6b46e37a9 Mon Sep 17 00:00:00 2001 From: Karl Bartel Date: Wed, 21 Aug 2024 13:25:47 +0200 Subject: [PATCH 4/4] Use WaitGroup instead of errgroup.Group The only error handling we do is logging, so we can log directly in the goroutine and avoid resetting the errgroup, which feels like it goes against the errgroup concept. If we only want to limit the number of goroutines, we can add our own semaphore. That would also allow us to skip adding new goroutines without blocking in errgroup.Group.Go. --- op-batcher/batcher/driver.go | 44 ++++++++++++------------------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 8410113fd418..6a142787fe5a 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -21,7 +21,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "golang.org/x/sync/errgroup" ) var ErrBatcherNotRunning = errors.New("batcher is not running") @@ -269,15 +268,7 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txID]) queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) - var ( - daErrGroup *errgroup.Group - daErrGroupCtx context.Context - ) - resetDaErrGroup := func() { - daErrGroup, daErrGroupCtx = errgroup.WithContext(l.killCtx) - daErrGroup.SetLimit(10) // TODO: make this configurable - } - resetDaErrGroup() + var daWaitGroup sync.WaitGroup // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -299,13 +290,12 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(daErrGroupCtx, queue, receiptsCh, daErrGroup) + l.publishStateToL1(queue, receiptsCh, &daWaitGroup) if !l.Txmgr.IsClosed() { l.Log.Info("Wait for pure DA writes, not L1 txs") - _ = daErrGroup.Wait() + daWaitGroup.Wait() l.Log.Info("Wait for L1 writes (blobs or DA commitments)") queue.Wait() - resetDaErrGroup() } else { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") } @@ -329,7 +319,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(daErrGroupCtx, queue, receiptsCh, daErrGroup) + l.publishStateToL1(queue, receiptsCh, &daWaitGroup) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -349,15 +339,6 @@ func (l *BatchSubmitter) loop() { l.Log.Info("Finished publishing all remaining channel data") return } - select { - case <-daErrGroupCtx.Done(): - err := daErrGroup.Wait() - if err != nil { - l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) - } - resetDaErrGroup() - default: - } } } @@ -395,14 +376,14 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daErrGroup *errgroup.Group) { +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(ctx, queue, receiptsCh, daErrGroup) + err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -452,7 +433,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], waitGroup *sync.WaitGroup) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -472,9 +453,14 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - errGroup.Go(func() error { - return l.sendTransaction(ctx, txdata, queue, receiptsCh) - }) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + err := l.sendTransaction(ctx, txdata, queue, receiptsCh) + if err != nil { + l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) + } + }() return nil }