Skip to content

Commit

Permalink
Apply fanout to block subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Jan 8, 2025
1 parent c91aeea commit 3b6cb6d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 14 deletions.
1 change: 0 additions & 1 deletion zetaclient/chains/bitcoin/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (b *Bitcoin) Start(ctx context.Context) error {
return errors.Wrap(err, "unable to get app from context")
}

// TODO: should we share & fan-out the same chan across all chains?
newBlockChan, err := b.observer.ZetacoreClient().NewBlockSubscriber(ctx)
if err != nil {
return errors.Wrap(err, "unable to create new block subscriber")
Expand Down
7 changes: 7 additions & 0 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
continue
}

// managed by V2
if chain.IsBitcoin() {
continue
}

// todo move metrics to v2

chainID := chain.ID()

// update chain parameters for signer and chain observer
Expand Down
5 changes: 5 additions & 0 deletions zetaclient/zetacore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

cometbftrpc "github.com/cometbft/cometbft/rpc/client"
cometbfthttp "github.com/cometbft/cometbft/rpc/client/http"
ctypes "github.com/cometbft/cometbft/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/pkg/errors"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/zeta-chain/node/app"
"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/fanout"
zetacorerpc "github.com/zeta-chain/node/pkg/rpc"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
Expand Down Expand Up @@ -47,6 +49,9 @@ type Client struct {
chainID string
chain chains.Chain

// blocksFanout that receives new block events from Zetacore via websockets
blocksFanout *fanout.FanOut[ctypes.EventDataNewBlock]

mu sync.RWMutex
}

Expand Down
78 changes: 65 additions & 13 deletions zetaclient/zetacore/client_subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,85 @@ package zetacore
import (
"context"

cometbfttypes "github.com/cometbft/cometbft/types"
ctypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/pkg/fanout"
)

// NewBlockSubscriber subscribes to cometbft new block events
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) {
rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String())
// NewBlockSubscriber subscribes to comet bft new block events.
// Subscribes share the same websocket connection but their channels are independent (fanout)
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) {
blockSubscriber, err := c.resolveBlockSubscriber()
if err != nil {
return nil, err
}

blockEventChan := make(chan cometbfttypes.EventDataNewBlock)
// we need a "proxy" chan instead of directly returning blockSubscriber.Add()
// to support context cancellation
blocksChan := make(chan ctypes.EventDataNewBlock)

go func() {
consumer := blockSubscriber.Add()

for {
select {
case <-ctx.Done():
return
case event := <-rawBlockEventChan:
newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue
}
blockEventChan <- newBlockEvent
case block := <-consumer:
blocksChan <- block
}
}
}()

return blocksChan, nil
}

// resolveBlockSubscriber returns the block subscriber channel
// or subscribes to it for the first time.
func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) {
// noop
if blocksFanout, ok := c.getBlockFanoutChan(); ok {
c.logger.Info().Msg("Resolved existing block subscriber")
return blocksFanout, nil
}

// Subscribe to comet bft events
eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String())
if err != nil {
return nil, err
}

c.logger.Info().Msg("Subscribed to new block events")

// Create block chan
blockChan := make(chan ctypes.EventDataNewBlock)

// Spin up a pipeline to forward block events to the blockChan
go func() {
for event := range eventsChan {
newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue
}

blockChan <- newBlockEvent
}
}()

return blockEventChan, nil
// Create a fanout
// It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently.
c.mu.Lock()
defer c.mu.Unlock()
c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer)

c.blocksFanout.Start()

return c.blocksFanout, nil
}

func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.blocksFanout, c.blocksFanout != nil
}

0 comments on commit 3b6cb6d

Please sign in to comment.