From 3b6cb6d3c043c075ee29225fee62172c5eb9b577 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Wed, 8 Jan 2025 20:30:00 +0100 Subject: [PATCH] Apply fanout to block subscriber --- zetaclient/chains/bitcoin/bitcoin.go | 1 - zetaclient/orchestrator/orchestrator.go | 7 ++ zetaclient/zetacore/client.go | 5 ++ zetaclient/zetacore/client_subscriptions.go | 78 +++++++++++++++++---- 4 files changed, 77 insertions(+), 14 deletions(-) diff --git a/zetaclient/chains/bitcoin/bitcoin.go b/zetaclient/chains/bitcoin/bitcoin.go index 9ebd04f917..410f2892e8 100644 --- a/zetaclient/chains/bitcoin/bitcoin.go +++ b/zetaclient/chains/bitcoin/bitcoin.go @@ -55,7 +55,6 @@ func (b *Bitcoin) Start(ctx context.Context) error { return errors.Wrap(err, "unable to get app from context") } - // TODO: should we share & fan-out the same chan across all chains? newBlockChan, err := b.observer.ZetacoreClient().NewBlockSubscriber(ctx) if err != nil { return errors.Wrap(err, "unable to create new block subscriber") diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index 0fcfc54c73..9600d23d11 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -352,6 +352,13 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error { continue } + // managed by V2 + if chain.IsBitcoin() { + continue + } + + // todo move metrics to v2 + chainID := chain.ID() // update chain parameters for signer and chain observer diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index df5b6dbeb6..a883aca855 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -8,6 +8,7 @@ import ( cometbftrpc "github.com/cometbft/cometbft/rpc/client" cometbfthttp "github.com/cometbft/cometbft/rpc/client/http" + ctypes "github.com/cometbft/cometbft/types" cosmosclient "github.com/cosmos/cosmos-sdk/client" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/pkg/errors" @@ -19,6 +20,7 @@ import ( "github.com/zeta-chain/node/app" "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/fanout" zetacorerpc "github.com/zeta-chain/node/pkg/rpc" "github.com/zeta-chain/node/zetaclient/chains/interfaces" "github.com/zeta-chain/node/zetaclient/config" @@ -47,6 +49,9 @@ type Client struct { chainID string chain chains.Chain + // blocksFanout that receives new block events from Zetacore via websockets + blocksFanout *fanout.FanOut[ctypes.EventDataNewBlock] + mu sync.RWMutex } diff --git a/zetaclient/zetacore/client_subscriptions.go b/zetaclient/zetacore/client_subscriptions.go index cb4229b31b..971b1edfa2 100644 --- a/zetaclient/zetacore/client_subscriptions.go +++ b/zetaclient/zetacore/client_subscriptions.go @@ -3,33 +3,85 @@ package zetacore import ( "context" - cometbfttypes "github.com/cometbft/cometbft/types" + ctypes "github.com/cometbft/cometbft/types" + + "github.com/zeta-chain/node/pkg/fanout" ) -// NewBlockSubscriber subscribes to cometbft new block events -func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) { - rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String()) +// NewBlockSubscriber subscribes to comet bft new block events. +// Subscribes share the same websocket connection but their channels are independent (fanout) +func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) { + blockSubscriber, err := c.resolveBlockSubscriber() if err != nil { return nil, err } - blockEventChan := make(chan cometbfttypes.EventDataNewBlock) + // we need a "proxy" chan instead of directly returning blockSubscriber.Add() + // to support context cancellation + blocksChan := make(chan ctypes.EventDataNewBlock) go func() { + consumer := blockSubscriber.Add() + for { select { case <-ctx.Done(): return - case event := <-rawBlockEventChan: - newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock) - if !ok { - c.logger.Error().Msgf("expecting new block event, got %T", event.Data) - continue - } - blockEventChan <- newBlockEvent + case block := <-consumer: + blocksChan <- block + } + } + }() + + return blocksChan, nil +} + +// resolveBlockSubscriber returns the block subscriber channel +// or subscribes to it for the first time. +func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) { + // noop + if blocksFanout, ok := c.getBlockFanoutChan(); ok { + c.logger.Info().Msg("Resolved existing block subscriber") + return blocksFanout, nil + } + + // Subscribe to comet bft events + eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String()) + if err != nil { + return nil, err + } + + c.logger.Info().Msg("Subscribed to new block events") + + // Create block chan + blockChan := make(chan ctypes.EventDataNewBlock) + + // Spin up a pipeline to forward block events to the blockChan + go func() { + for event := range eventsChan { + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) + if !ok { + c.logger.Error().Msgf("expecting new block event, got %T", event.Data) + continue } + + blockChan <- newBlockEvent } }() - return blockEventChan, nil + // Create a fanout + // It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently. + c.mu.Lock() + defer c.mu.Unlock() + c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer) + + c.blocksFanout.Start() + + return c.blocksFanout, nil +} + +func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.blocksFanout, c.blocksFanout != nil }