Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: non blocking da #213

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
var daWaitGroup sync.WaitGroup

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd create a similar interface to txmgr.Queue to hide impl details on how you're doing this

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we should have a config option like l.Config.MaxPendingTransactions but for DA.


// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand All @@ -289,8 +290,11 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this new architecture, the mainLoop talks to the daQueue which then talks to queue.

With that in mind. I think if we have a DaPublisher (similar to the TxManager) we can abstract away all details of the TxManager... all in all, the TxManager is an impl detail of the DaPublisher.

From the BatchSubmitter perspective, It doesn't need to know about the TxManager. It only needs to be able to "publish txData", and for each txData to be published it needs a "receipt"... a resulst to know wether publishing was successful or a failure.

Also, better to have logic from the "publisher" in it's own class and outside the BatchSubmitter, so moving sendTransaction out. That would give us more control to what "state" is accesible, and we don't. have 2 goroutines accesing state in concurrent manner for thing that don't support that

if !l.Txmgr.IsClosed() {
l.Log.Info("Wait for pure DA writes, not L1 txs")
daWaitGroup.Wait()
l.Log.Info("Wait for L1 writes (blobs or DA commitments)")
queue.Wait()
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand All @@ -315,7 +319,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -372,14 +376,14 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], daWaitGroup *sync.WaitGroup) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -429,7 +433,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID], waitGroup *sync.WaitGroup) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand All @@ -449,9 +453,14 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
waitGroup.Add(1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would replace this by a daQueue.send(), and again, hide away details of how you manage this

go func() {
defer waitGroup.Done()
err := l.sendTransaction(ctx, txdata, queue, receiptsCh)
if err != nil {
l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err)
}
}()
return nil
}

Expand Down
Loading