Skip to content

Commit

Permalink
Move the broadcasting of signed VAA to the worker
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 20, 2024
1 parent bf2d545 commit fcf0b41
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
2 changes: 1 addition & 1 deletion node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
)
}

p.storeSignedVAA(v, "")
p.storeSignedVAA(v)
}
23 changes: 17 additions & 6 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,19 @@ func (p *Processor) Run(ctx context.Context) error {
}
}

func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) {
// storeSignedVAA schedules a database update for a VAA.
func (p *Processor) storeSignedVAA(v *vaa.VAA) {
p.postUpdate(v, "")
}

// postSignedVAA schedules handling of a newly signed VAA. This includes
// writing it to the database, broadcasting it and generating a log message.
func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) {
p.postUpdate(v, hash)
}

// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine.
func (p *Processor) postUpdate(v *vaa.VAA, hash string) {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
Expand Down Expand Up @@ -363,12 +375,10 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
return entry.v
}

// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map
// vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map
// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
// gets updated again while we are in the process of writing it to the database.
//
// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more
// than a millisecond.
// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting
// a newly signed VAA and writing the "signed VAA with quorum" info log.
func (p *Processor) vaaWriter(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
for {
Expand Down Expand Up @@ -398,6 +408,7 @@ func (p *Processor) vaaWriter(ctx context.Context) error {
zap.String("message_id", entry.v.MessageID()),
zap.String("digest", entry.hashToLog),
)
p.broadcastSignedVAA(entry.v)
}
}

Expand Down
3 changes: 1 addition & 2 deletions node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
ConsistencyLevel: v.ConsistencyLevel,
}

p.storeSignedVAA(signed, hash)
p.broadcastSignedVAA(signed)
p.postSignedVAA(signed, hash)
p.state.signatures[hash].submitted = true
}

Expand Down

0 comments on commit fcf0b41

Please sign in to comment.