Skip to content

Commit

Permalink
Handle additional update while writing to db
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 20, 2024
1 parent aef4046 commit bf2d545
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
2 changes: 1 addition & 1 deletion node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
)
}

p.storeSignedVAA(v)
p.storeSignedVAA(v, "")
}
60 changes: 49 additions & 11 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,14 @@ type Processor struct {
pythnetVaas map[string]PythNetVaaEntry
gatewayRelayer *gwrelayer.GatewayRelayer
updateVAALock sync.Mutex
updatedVAAs map[string]*vaa.VAA
updatedVAAs map[string]*updateVaaEntry
}

// updateVaaEntry is used to queue up a VAA to be written to the database.
type updateVaaEntry struct {
v *vaa.VAA
hashToLog string
dirty bool
}

var (
Expand Down Expand Up @@ -196,7 +203,7 @@ func NewProcessor(
acctReadC: acctReadC,
pythnetVaas: make(map[string]PythNetVaaEntry),
gatewayRelayer: gatewayRelayer,
updatedVAAs: make(map[string]*vaa.VAA),
updatedVAAs: make(map[string]*updateVaaEntry),
}
}

Expand Down Expand Up @@ -301,15 +308,15 @@ func (p *Processor) Run(ctx context.Context) error {
}
}

func (p *Processor) storeSignedVAA(v *vaa.VAA) {
func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return
}
key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
p.updateVAALock.Lock()
p.updatedVAAs[key] = v
p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true}
p.updateVAALock.Unlock()
}

Expand Down Expand Up @@ -345,33 +352,64 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool {
return ok
}

// getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil.
func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
p.updateVAALock.Lock()
v, exists := p.updatedVAAs[key]
entry, exists := p.updatedVAAs[key]
p.updateVAALock.Unlock()
if !exists {
return nil
}
return v
return entry.v
}

// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map
// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
// gets updated again while we are in the process of writing it to the database.
//
// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more
// than a millisecond.
func (p *Processor) vaaWriter(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
var updatedVAAs map[string]*updateVaaEntry
p.updateVAALock.Lock()
updatedVAAs := p.updatedVAAs
p.updatedVAAs = make(map[string]*vaa.VAA)
if len(p.updatedVAAs) != 0 {
// There's something to write. Create a local copy of the map so we can release the lock.
updatedVAAs = make(map[string]*updateVaaEntry)
for key, entry := range p.updatedVAAs {
updatedVAAs[key] = entry
entry.dirty = false
}
}
p.updateVAALock.Unlock()
if len(updatedVAAs) != 0 {
for _, v := range updatedVAAs {
if err := p.db.StoreSignedVAA(v); err != nil {
if updatedVAAs != nil {
// If there's anything to write, do that.
for _, entry := range updatedVAAs {
if err := p.db.StoreSignedVAA(entry.v); err != nil {
p.logger.Error("failed to write VAA to database", zap.Error(err))
}
if entry.hashToLog != "" {
p.logger.Info("signed VAA with quorum",
zap.String("message_id", entry.v.MessageID()),
zap.String("digest", entry.hashToLog),
)
}
}

// Go through the map and delete anything we have written that hasn't been updated again.
// If something has been updated again, it will get written next interval.
p.updateVAALock.Lock()
for key, entry := range p.updatedVAAs {
if !entry.dirty {
delete(p.updatedVAAs, key)
}
}
p.updateVAALock.Unlock()
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)

type VAA struct {
Expand All @@ -26,13 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
ConsistencyLevel: v.ConsistencyLevel,
}

// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
)

p.storeSignedVAA(signed)
p.storeSignedVAA(signed, hash)
p.broadcastSignedVAA(signed)
p.state.signatures[hash].submitted = true
}
Expand Down

0 comments on commit bf2d545

Please sign in to comment.