diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 66810b30b4..def2cf517c 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -128,6 +128,36 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { return nil } +// StoreSignedVAABatch writes multiple VAAs to the database using the BadgerDB batch API. +// Note that the API takes care of splitting up the slice into the maximum allowed count +// and size so we don't need to worry about that. +func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error { + batchTx := d.db.NewWriteBatch() + defer batchTx.Cancel() + + for _, v := range vaaBatch { + if len(v.Signatures) == 0 { + panic("StoreSignedVAABatch called for unsigned VAA") + } + + b, err := v.Marshal() + if err != nil { + panic("StoreSignedVAABatch failed to marshall VAA") + } + + err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b) + if err != nil { + return err + } + + storedVaaTotal.Inc() + } + + // Wait for the batch to finish. + err := batchTx.Flush() + return err +} + func (d *Database) HasVAA(id VAAID) (bool, error) { err := d.db.View(func(txn *badger.Txn) error { _, err := txn.Get(id.Bytes()) diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index ffbdc2f804..5fde4c07d4 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -1,6 +1,7 @@ package db import ( + "bytes" "crypto/ecdsa" "crypto/rand" "fmt" @@ -22,6 +23,10 @@ import ( ) func getVAA() vaa.VAA { + return getVAAWithSeqNum(1) +} + +func getVAAWithSeqNum(seqNum uint64) vaa.VAA { var payload = []byte{97, 97, 97, 97, 97, 97} var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} @@ -31,7 +36,7 @@ func getVAA() vaa.VAA { Signatures: nil, Timestamp: time.Unix(0, 0), Nonce: uint32(1), - Sequence: uint64(1), + Sequence: seqNum, ConsistencyLevel: uint8(32), EmitterChain: vaa.ChainIDSolana, EmitterAddress: governanceEmitter, @@ -114,6 +119,68 @@ func TestStoreSignedVAASigned(t *testing.T) { assert.NoError(t, err2) } +func TestStoreSignedVAABatch(t *testing.T) { + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + + require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857. + require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329. + + // Make sure we exceed the max batch size. + numVAAs := uint64(db.db.MaxBatchCount() + 1) + + // Build the VAA batch. + vaaBatch := make([]*vaa.VAA, 0, numVAAs) + for seqNum := uint64(0); seqNum < numVAAs; seqNum++ { + v := getVAAWithSeqNum(seqNum) + v.AddSignature(privKey, 0) + vaaBatch = append(vaaBatch, &v) + } + + // Store the batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } + + // Verify that updates work as well by tweaking the VAAs and rewriting them. + for _, v := range vaaBatch { + v.Nonce += 1 + } + + // Store the updated batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the updated VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } +} + func TestGetSignedVAABytes(t *testing.T) { dbPath := t.TempDir() db, err := Open(dbPath) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index f175fa0f0f..f5f4496169 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -399,10 +399,9 @@ func (p *Processor) vaaWriter(ctx context.Context) error { p.updateVAALock.Unlock() if updatedVAAs != nil { // If there's anything to write, do that. + vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs)) for _, entry := range updatedVAAs { - if err := p.db.StoreSignedVAA(entry.v); err != nil { - p.logger.Error("failed to write VAA to database", zap.Error(err)) - } + vaaBatch = append(vaaBatch, entry.v) if entry.hashToLog != "" { p.logger.Info("signed VAA with quorum", zap.String("message_id", entry.v.MessageID()), @@ -412,6 +411,10 @@ func (p *Processor) vaaWriter(ctx context.Context) error { } } + if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil { + p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err)) + } + // Go through the map and delete anything we have written that hasn't been updated again. // If something has been updated again, it will get written next interval. p.updateVAALock.Lock()