From 27d3dbc252bf2ebab2f560875941a5f970862c28 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Mon, 1 Jul 2024 13:22:44 -0500 Subject: [PATCH] Don't move broadcasting to worker --- node/pkg/processor/processor.go | 28 ++++------------------------ node/pkg/processor/vaa.go | 10 +++++++++- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index f5f4496169..bc0588a998 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -146,9 +146,8 @@ type Processor struct { // updateVaaEntry is used to queue up a VAA to be written to the database. type updateVaaEntry struct { - v *vaa.VAA - hashToLog string - dirty bool + v *vaa.VAA + dirty bool } var ( @@ -310,17 +309,6 @@ func (p *Processor) Run(ctx context.Context) error { // storeSignedVAA schedules a database update for a VAA. func (p *Processor) storeSignedVAA(v *vaa.VAA) { - p.postUpdate(v, "") -} - -// postSignedVAA schedules handling of a newly signed VAA. This includes -// writing it to the database, broadcasting it and generating a log message. -func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) { - p.postUpdate(v, hash) -} - -// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine. -func (p *Processor) postUpdate(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -328,7 +316,7 @@ func (p *Processor) postUpdate(v *vaa.VAA, hash string) { } key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) p.updateVAALock.Lock() - p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true} + p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true} p.updateVAALock.Unlock() } @@ -377,8 +365,7 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA -// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting -// a newly signed VAA and writing the "signed VAA with quorum" info log. +// gets updated again while we are in the process of writing it to the database. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -402,13 +389,6 @@ func (p *Processor) vaaWriter(ctx context.Context) error { vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs)) for _, entry := range updatedVAAs { vaaBatch = append(vaaBatch, entry.v) - if entry.hashToLog != "" { - p.logger.Info("signed VAA with quorum", - zap.String("message_id", entry.v.MessageID()), - zap.String("digest", entry.hashToLog), - ) - p.broadcastSignedVAA(entry.v) - } } if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil { diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index f4896dad2e..7d41bbc244 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -2,6 +2,7 @@ package processor import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" ) type VAA struct { @@ -25,7 +26,14 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - p.postSignedVAA(signed, hash) + // Store signed VAA in database. + p.logger.Info("signed VAA with quorum", + zap.String("message_id", signed.MessageID()), + zap.String("digest", hash), + ) + + p.broadcastSignedVAA(signed) + p.storeSignedVAA(signed) } func (v *VAA) IsReliable() bool {