diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 2e15ea8fd4..f0d2312360 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - p.storeSignedVAA(v) + p.storeSignedVAA(v, "") } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 95a9c1cb4f..0b46ddd688 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -141,7 +141,14 @@ type Processor struct { pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer updateVAALock sync.Mutex - updatedVAAs map[string]*vaa.VAA + updatedVAAs map[string]*updateVaaEntry +} + +// updateVaaEntry is used to queue up a VAA to be written to the database. +type updateVaaEntry struct { + v *vaa.VAA + hashToLog string + dirty bool } var ( @@ -196,7 +203,7 @@ func NewProcessor( acctReadC: acctReadC, pythnetVaas: make(map[string]PythNetVaaEntry), gatewayRelayer: gatewayRelayer, - updatedVAAs: make(map[string]*vaa.VAA), + updatedVAAs: make(map[string]*updateVaaEntry), } } @@ -301,7 +308,7 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA) { +func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -309,7 +316,7 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) { } key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) p.updateVAALock.Lock() - p.updatedVAAs[key] = v + p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true} p.updateVAALock.Unlock() } @@ -345,16 +352,23 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return ok } +// getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil. func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { p.updateVAALock.Lock() - v, exists := p.updatedVAAs[key] + entry, exists := p.updatedVAAs[key] p.updateVAALock.Unlock() if !exists { return nil } - return v + return entry.v } +// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map +// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA +// gets updated again while we are in the process of writing it to the database. +// +// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more +// than a millisecond. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -362,16 +376,40 @@ func (p *Processor) vaaWriter(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: + var updatedVAAs map[string]*updateVaaEntry p.updateVAALock.Lock() - updatedVAAs := p.updatedVAAs - p.updatedVAAs = make(map[string]*vaa.VAA) + if len(p.updatedVAAs) != 0 { + // There's something to write. Create a local copy of the map so we can release the lock. + updatedVAAs = make(map[string]*updateVaaEntry) + for key, entry := range p.updatedVAAs { + updatedVAAs[key] = entry + entry.dirty = false + } + } p.updateVAALock.Unlock() - if len(updatedVAAs) != 0 { - for _, v := range updatedVAAs { - if err := p.db.StoreSignedVAA(v); err != nil { + if updatedVAAs != nil { + // If there's anything to write, do that. + for _, entry := range updatedVAAs { + if err := p.db.StoreSignedVAA(entry.v); err != nil { p.logger.Error("failed to write VAA to database", zap.Error(err)) } + if entry.hashToLog != "" { + p.logger.Info("signed VAA with quorum", + zap.String("message_id", entry.v.MessageID()), + zap.String("digest", entry.hashToLog), + ) + } + } + + // Go through the map and delete anything we have written that hasn't been updated again. + // If something has been updated again, it will get written next interval. + p.updateVAALock.Lock() + for key, entry := range p.updatedVAAs { + if !entry.dirty { + delete(p.updatedVAAs, key) + } } + p.updateVAALock.Unlock() } } } diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 6f6e8591e6..f0a960fe2b 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -2,7 +2,6 @@ package processor import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" - "go.uber.org/zap" ) type VAA struct { @@ -26,13 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - // Store signed VAA in database. - p.logger.Info("signed VAA with quorum", - zap.String("message_id", signed.MessageID()), - zap.String("digest", hash), - ) - - p.storeSignedVAA(signed) + p.storeSignedVAA(signed, hash) p.broadcastSignedVAA(signed) p.state.signatures[hash].submitted = true }