Skip to content

Commit

Permalink
Node: Processor db write separation
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 20, 2024
1 parent 627faa7 commit aef4046
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 35 deletions.
37 changes: 21 additions & 16 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,29 +271,34 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
return false, nil
}

vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID())
msgId := s.ourObservation.MessageID()
vaaID, err := db.VaaIDFromString(msgId)
if err != nil {
return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err)
}

vb, err := p.db.GetSignedVAABytes(*vaaID)
if err != nil {
if err == db.ErrVAANotFound {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA not in DB",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("digest", hash),
)
// If the VAA is waiting to be written to the DB, use that version. Otherwise use the DB.
v := p.getVaaFromUpdateMap(msgId)
if v == nil {
vb, err := p.db.GetSignedVAABytes(*vaaID)
if err != nil {
if err == db.ErrVAANotFound {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA not in DB",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("digest", hash),
)
}
return false, nil
} else {
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
}
return false, nil
} else {
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
}
}

v, err := vaa.Unmarshal(vb)
if err != nil {
return false, fmt.Errorf("failed to unmarshal VAA: %w", err)
v, err = vaa.Unmarshal(vb)
if err != nil {
return false, fmt.Errorf("failed to unmarshal VAA: %w", err)
}
}

oldHash := hex.EncodeToString(v.SigningDigest().Bytes())
Expand Down
8 changes: 1 addition & 7 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
)
}

if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA",
zap.String("message_id", v.MessageID()),
zap.Error(err),
)
return
}
p.storeSignedVAA(v)
}
55 changes: 51 additions & 4 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/certusone/wormhole/node/pkg/db"
Expand Down Expand Up @@ -139,6 +140,8 @@ type Processor struct {
acctReadC <-chan *common.MessagePublication
pythnetVaas map[string]PythNetVaaEntry
gatewayRelayer *gwrelayer.GatewayRelayer
updateVAALock sync.Mutex
updatedVAAs map[string]*vaa.VAA
}

var (
Expand Down Expand Up @@ -193,10 +196,15 @@ func NewProcessor(
acctReadC: acctReadC,
pythnetVaas: make(map[string]PythNetVaaEntry),
gatewayRelayer: gatewayRelayer,
updatedVAAs: make(map[string]*vaa.VAA),
}
}

func (p *Processor) Run(ctx context.Context) error {
if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
return fmt.Errorf("failed to start vaa writer: %w", err)
}

cleanup := time.NewTicker(CleanupInterval)

// Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
Expand Down Expand Up @@ -293,13 +301,16 @@ func (p *Processor) Run(ctx context.Context) error {
}
}

func (p *Processor) storeSignedVAA(v *vaa.VAA) error {
func (p *Processor) storeSignedVAA(v *vaa.VAA) {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return nil
return
}
return p.db.StoreSignedVAA(v)
key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
p.updateVAALock.Lock()
p.updatedVAAs[key] = v
p.updateVAALock.Unlock()
}

// haveSignedVAA returns true if we already have a VAA for the given VAAID
Expand All @@ -313,12 +324,16 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool {
return exists
}

key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence)
if p.getVaaFromUpdateMap(key) != nil {
return true
}

if p.db == nil {
return false
}

ok, err := p.db.HasVAA(id)

if err != nil {
p.logger.Error("failed to look up VAA in database",
zap.String("vaaID", string(id.Bytes())),
Expand All @@ -329,3 +344,35 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool {

return ok
}

func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
p.updateVAALock.Lock()
v, exists := p.updatedVAAs[key]
p.updateVAALock.Unlock()
if !exists {
return nil
}
return v
}

func (p *Processor) vaaWriter(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
p.updateVAALock.Lock()
updatedVAAs := p.updatedVAAs
p.updatedVAAs = make(map[string]*vaa.VAA)
p.updateVAALock.Unlock()
if len(updatedVAAs) != 0 {
for _, v := range updatedVAAs {
if err := p.db.StoreSignedVAA(v); err != nil {
p.logger.Error("failed to write VAA to database", zap.Error(err))
}
}
}
}
}
}
9 changes: 1 addition & 8 deletions node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
zap.String("digest", hash),
)

if err := p.storeSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
zap.Error(err),
)
}

p.storeSignedVAA(signed)
p.broadcastSignedVAA(signed)
p.state.signatures[hash].submitted = true
}
Expand Down

0 comments on commit aef4046

Please sign in to comment.