Skip to content

Commit

Permalink
Use BadgerDB batch API to store VAAs
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 28, 2024
1 parent 026d7ac commit 9b13aa3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
30 changes: 30 additions & 0 deletions node/pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,36 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
return nil
}

// StoreSignedVAABatch writes multiple VAAs to the database using the BadgerDB batch API.
// Note that the API takes care of splitting up the slice into the maximum allowed count
// and size so we don't need to worry about that.
func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error {
batchTx := d.db.NewWriteBatch()
defer batchTx.Cancel()

for _, v := range vaaBatch {
if len(v.Signatures) == 0 {
panic("StoreSignedVAABatch called for unsigned VAA")
}

b, err := v.Marshal()
if err != nil {
panic("StoreSignedVAABatch failed to marshall VAA")
}

err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b)
if err != nil {
return err
}

storedVaaTotal.Inc()
}

// Wait for the batch to finish.
err := batchTx.Flush()
return err
}

func (d *Database) HasVAA(id VAAID) (bool, error) {
err := d.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(id.Bytes())
Expand Down
49 changes: 48 additions & 1 deletion node/pkg/db/db_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"bytes"
"crypto/ecdsa"
"crypto/rand"
"fmt"
Expand All @@ -22,6 +23,10 @@ import (
)

func getVAA() vaa.VAA {
return getVAAWithSeqNum(1)
}

func getVAAWithSeqNum(seqNum uint64) vaa.VAA {
var payload = []byte{97, 97, 97, 97, 97, 97}
var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

Expand All @@ -31,7 +36,7 @@ func getVAA() vaa.VAA {
Signatures: nil,
Timestamp: time.Unix(0, 0),
Nonce: uint32(1),
Sequence: uint64(1),
Sequence: seqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: governanceEmitter,
Expand Down Expand Up @@ -114,6 +119,48 @@ func TestStoreSignedVAASigned(t *testing.T) {
assert.NoError(t, err2)
}

func TestStoreSignedVAABatch(t *testing.T) {
dbPath := t.TempDir()
db, err := Open(dbPath)
if err != nil {
t.Error("failed to open database")
}
defer db.Close()
defer os.Remove(dbPath)

privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
require.NoError(t, err)

require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857.
require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329.

// Make sure we exceed the max batch size.
numVAAs := uint64(db.db.MaxBatchCount() + 1)

// Build the VAA batch.
vaaBatch := make([]*vaa.VAA, 0, numVAAs)
for seqNum := uint64(0); seqNum < numVAAs; seqNum++ {
v := getVAAWithSeqNum(seqNum)
v.AddSignature(privKey, 0)
vaaBatch = append(vaaBatch, &v)
}

// Store the batch in the database.
err = db.StoreSignedVAABatch(vaaBatch)
require.NoError(t, err)

// Verify all the VAAs are in the database.
for _, v := range vaaBatch {
storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v))
require.NoError(t, err)

origBytes, err := v.Marshal()
require.NoError(t, err)

assert.True(t, bytes.Equal(origBytes, storedBytes))
}
}

func TestGetSignedVAABytes(t *testing.T) {
dbPath := t.TempDir()
db, err := Open(dbPath)
Expand Down
9 changes: 6 additions & 3 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,9 @@ func (p *Processor) vaaWriter(ctx context.Context) error {
p.updateVAALock.Unlock()
if updatedVAAs != nil {
// If there's anything to write, do that.
vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs))
for _, entry := range updatedVAAs {
if err := p.db.StoreSignedVAA(entry.v); err != nil {
p.logger.Error("failed to write VAA to database", zap.Error(err))
}
vaaBatch = append(vaaBatch, entry.v)
if entry.hashToLog != "" {
p.logger.Info("signed VAA with quorum",
zap.String("message_id", entry.v.MessageID()),
Expand All @@ -412,6 +411,10 @@ func (p *Processor) vaaWriter(ctx context.Context) error {
}
}

if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil {
p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err))
}

// Go through the map and delete anything we have written that hasn't been updated again.
// If something has been updated again, it will get written next interval.
p.updateVAALock.Lock()
Expand Down

0 comments on commit 9b13aa3

Please sign in to comment.