From d602f5830744be4337959a3004a4a785d389df43 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 20 Jun 2024 10:48:50 -0500 Subject: [PATCH] Move the broadcasting of signed VAA to the worker --- node/pkg/processor/observation.go | 2 +- node/pkg/processor/processor.go | 23 +++++++++++++++++------ node/pkg/processor/vaa.go | 3 +-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index f0d2312360..2e15ea8fd4 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - p.storeSignedVAA(v, "") + p.storeSignedVAA(v) } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 0b46ddd688..f175fa0f0f 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -308,7 +308,19 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) { +// storeSignedVAA schedules a database update for a VAA. +func (p *Processor) storeSignedVAA(v *vaa.VAA) { + p.postUpdate(v, "") +} + +// postSignedVAA schedules handling of a newly signed VAA. This includes +// writing it to the database, broadcasting it and generating a log message. +func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) { + p.postUpdate(v, hash) +} + +// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine. +func (p *Processor) postUpdate(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -363,12 +375,10 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { return entry.v } -// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map +// vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA -// gets updated again while we are in the process of writing it to the database. -// -// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more -// than a millisecond. +// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting +// a newly signed VAA and writing the "signed VAA with quorum" info log. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -398,6 +408,7 @@ func (p *Processor) vaaWriter(ctx context.Context) error { zap.String("message_id", entry.v.MessageID()), zap.String("digest", entry.hashToLog), ) + p.broadcastSignedVAA(entry.v) } } diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index f0a960fe2b..1029420931 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -25,8 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - p.storeSignedVAA(signed, hash) - p.broadcastSignedVAA(signed) + p.postSignedVAA(signed, hash) p.state.signatures[hash].submitted = true }