-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
op-batcher: non blocking da #213
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,6 +268,7 @@ func (l *BatchSubmitter) loop() { | |
|
||
receiptsCh := make(chan txmgr.TxReceipt[txID]) | ||
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) | ||
var daWaitGroup sync.WaitGroup | ||
|
||
// start the receipt/result processing loop | ||
receiptLoopDone := make(chan struct{}) | ||
|
@@ -289,8 +290,11 @@ func (l *BatchSubmitter) loop() { | |
defer ticker.Stop() | ||
|
||
publishAndWait := func() { | ||
l.publishStateToL1(queue, receiptsCh) | ||
l.publishStateToL1(queue, receiptsCh, &daWaitGroup) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this new architecture, the With that in mind. I think if we have a From the Also, better to have logic from the "publisher" in it's own class and outside the BatchSubmitter, so moving |
||
if !l.Txmgr.IsClosed() { | ||
l.Log.Info("Wait for pure DA writes, not L1 txs") | ||
daWaitGroup.Wait() | ||
l.Log.Info("Wait for L1 writes (blobs or DA commitments)") | ||
queue.Wait() | ||
} else { | ||
l.Log.Info("Txmgr is closed, remaining channel data won't be sent") | ||
|
@@ -315,7 +319,7 @@ func (l *BatchSubmitter) loop() { | |
l.clearState(l.shutdownCtx) | ||
continue | ||
} | ||
l.publishStateToL1(queue, receiptsCh) | ||
l.publishStateToL1(queue, receiptsCh, &daWaitGroup) | ||
case <-l.shutdownCtx.Done(): | ||
if l.Txmgr.IsClosed() { | ||
l.Log.Info("Txmgr is closed, remaining channel data won't be sent") | ||
|
@@ -372,14 +376,14 @@ func (l *BatchSubmitter) waitNodeSync() error { | |
|
||
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is | ||
// no more data to queue for publishing or if there was an error queing the data. | ||
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { | ||
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) { | ||
for { | ||
// if the txmgr is closed, we stop the transaction sending | ||
if l.Txmgr.IsClosed() { | ||
l.Log.Info("Txmgr is closed, aborting state publishing") | ||
return | ||
} | ||
err := l.publishTxToL1(l.killCtx, queue, receiptsCh) | ||
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup) | ||
if err != nil { | ||
if err != io.EOF { | ||
l.Log.Error("Error publishing tx to l1", "err", err) | ||
|
@@ -429,7 +433,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { | |
} | ||
|
||
// publishTxToL1 submits a single state tx to the L1 | ||
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error { | ||
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], waitGroup *sync.WaitGroup) error { | ||
// send all available transactions | ||
l1tip, err := l.l1Tip(ctx) | ||
if err != nil { | ||
|
@@ -449,9 +453,14 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t | |
return err | ||
} | ||
|
||
if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil { | ||
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) | ||
} | ||
waitGroup.Add(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would replace this by a |
||
go func() { | ||
defer waitGroup.Done() | ||
err := l.sendTransaction(ctx, txdata, queue, receiptsCh) | ||
if err != nil { | ||
l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err) | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd create a similar interface to
txmgr.Queue
to hide impl details on how you're doing thisThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, we should have a config option like
l.Config.MaxPendingTransactions
but for DA.