Skip to content

Commit

Permalink
Wait for DA on reorgs/shutdown
Browse files Browse the repository at this point in the history
publishAndWait is meant to wait until all remaining data is written.
Only waiting to the txs in the queue won't work, since those are only
created after the data has been written to the DA.
  • Loading branch information
karlb committed Aug 16, 2024
1 parent d6c69f1 commit eba6f86
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
var daWaitGroup sync.WaitGroup

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand All @@ -290,8 +291,11 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
if !l.Txmgr.IsClosed() {
l.Log.Info("Wait for pure DA writes, not L1 txs")
daWaitGroup.Wait()
l.Log.Info("Wait for L1 writes (blobs or DA commitments)")
queue.Wait()
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand All @@ -316,7 +320,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -373,7 +377,7 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) {
errGroup, ctx := errgroup.WithContext(l.killCtx)
for {
// if the txmgr is closed, we stop the transaction sending
Expand All @@ -389,7 +393,7 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh c
return
default:
}
err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup)
err := l.publishTxToL1(ctx, queue, receiptsCh, errGroup, daWaitGroup)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -439,7 +443,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], errGroup *errgroup.Group, daWaitGroup *sync.WaitGroup) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand All @@ -459,7 +463,9 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

daWaitGroup.Add(1)
errGroup.Go(func() error {
defer daWaitGroup.Done()
return l.sendTransaction(ctx, txdata, queue, receiptsCh)
})
return nil
Expand Down

0 comments on commit eba6f86

Please sign in to comment.