Skip to content

Commit

Permalink
WIP refactor loop to event based processing
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed May 13, 2022
1 parent 6cc35fa commit 49600be
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 57 deletions.
10 changes: 8 additions & 2 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,16 @@ $ %s query unrelayed-pkts demo-path channel-0`,
return err
}

eventBus := relayer.NewChainEventBus([]*relayer.Chain{c[src], c[dst]}, a.Log)
eventBus.Start(cmd.Context())

channelID := args[1]
channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID)
if err != nil {
return err
}

sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel)
sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel, &eventBus)
if err != nil {
return err
}
Expand Down Expand Up @@ -941,13 +944,16 @@ $ %s query unrelayed-acks demo-path channel-0`,
return err
}

eventBus := relayer.NewChainEventBus([]*relayer.Chain{c[src], c[dst]}, a.Log)
eventBus.Start(cmd.Context())

channelID := args[1]
channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID)
if err != nil {
return err
}

sp, err := relayer.UnrelayedAcknowledgements(cmd.Context(), c[src], c[dst], channel)
sp, err := relayer.UnrelayedAcknowledgements(cmd.Context(), c[src], c[dst], channel, &eventBus)
if err != nil {
return err
}
Expand Down
19 changes: 14 additions & 5 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,13 +728,16 @@ $ %s tx relay-pkt demo-path channel-1 1`,
return err
}

eventBus := relayer.NewChainEventBus([]*relayer.Chain{c[src], c[dst]}, a.Log)
eventBus.Start(cmd.Context())

channelID := args[1]
channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID)
if err != nil {
return err
}

sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel)
sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel, &eventBus)
if err != nil {
return err
}
Expand Down Expand Up @@ -778,12 +781,15 @@ $ %s tx relay-pkts demo-path channel-0`,
return err
}

sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel)
eventBus := relayer.NewChainEventBus([]*relayer.Chain{c[src], c[dst]}, a.Log)
eventBus.Start(cmd.Context())

sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel, &eventBus)
if err != nil {
return err
}

if err = relayer.RelayPackets(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
if err = relayer.RelayPackets(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel, &eventBus); err != nil {
return err
}

Expand Down Expand Up @@ -826,14 +832,17 @@ $ %s tx relay-acks demo-path channel-0 -l 3 -s 6`,
return err
}

eventBus := relayer.NewChainEventBus([]*relayer.Chain{c[src], c[dst]}, a.Log)
eventBus.Start(cmd.Context())

// sp.Src contains all sequences acked on SRC but acknowledgement not processed on DST
// sp.Dst contains all sequences acked on DST but acknowledgement not processed on SRC
sp, err := relayer.UnrelayedAcknowledgements(cmd.Context(), c[src], c[dst], channel)
sp, err := relayer.UnrelayedAcknowledgements(cmd.Context(), c[src], c[dst], channel, &eventBus)
if err != nil {
return err
}

if err = relayer.RelayAcknowledgements(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
if err = relayer.RelayAcknowledgements(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel, &eventBus); err != nil {
return err
}

Expand Down
35 changes: 35 additions & 0 deletions relayer/chainProcessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package relayer

type ChainProcessor struct {
chain *Chain
unrelayedSequencesQueue chan uint64
}

const unrelayedSequencesQueueCapacity = 1000

func NewChainProcessor(chain *Chain) *ChainProcessor {
return &ChainProcessor{
chain: chain,
unrelayedSequencesQueue: make(chan uint64, unrelayedSequencesQueueCapacity),
}
}

func (p *ChainProcessor) Start() {
go p.unrelayedSequencesWorker()
}

func (p *ChainProcessor) unrelayedSequencesWorker() {
for unrelayedSequence := range p.unrelayedSequencesQueue {
p.processUnrelayedSequence(unrelayedSequence)
}
}

func (p *ChainProcessor) EnqueueJob(unrelayedSequences []uint64) {
for _, unrelayedSequence := range unrelayedSequences {
p.unrelayedSequencesQueue <- unrelayedSequence
}
}

func (p *ChainProcessor) processUnrelayedSequence(unrelayedSequence uint64) {
// TODO
}
111 changes: 111 additions & 0 deletions relayer/eventBus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package relayer

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
)

type ChainEventBus struct {
chains map[string]*ChainEventBusChain
}

type ChainEventBusChain struct {
chain *Chain
subscribers []*ChainEventBusSubscriber
latestHeight int64
lastHeightQueried int64
log *zap.Logger
}

type ChainEventBusSubscriber struct {
chainID string
onNewUnrelayedSequences func([]uint64)
}

const (
minQueryLoopDuration = 3 * time.Second
heightQueryTimeout = 5 * time.Second
)

func NewChainEventBus(chains []*Chain, log *zap.Logger) ChainEventBus {
chainEventBusChains := make(map[string]*ChainEventBusChain)
for _, chain := range chains {
chainEventBusChains[chain.ChainID()] = &ChainEventBusChain{
chain: chain,
subscribers: []*ChainEventBusSubscriber{},
log: log,
}
}
return ChainEventBus{
chains: chainEventBusChains,
}
}

func (ceb *ChainEventBus) Subscribe(srcChainID string, dstChainID string, onNewUnrelayedSequences func([]uint64)) error {
if _, ok := ceb.chains[srcChainID]; !ok {
return fmt.Errorf("unable to subscribe, chain id does not exist: %s", srcChainID)
}
ceb.chains[srcChainID].subscribers = append(ceb.chains[srcChainID].subscribers, &ChainEventBusSubscriber{
chainID: dstChainID,
onNewUnrelayedSequences: onNewUnrelayedSequences,
})
return nil
}

func (ceb *ChainEventBus) Start(ctx context.Context) {
wg := sync.WaitGroup{}
for _, chain := range ceb.chains {
wg.Add(1)
go chain.QueryLoop(ctx, &wg)
}
// wait for initial height from all chains
wg.Wait()
}

func (c *ChainEventBusChain) QueryLoop(ctx context.Context, wg *sync.WaitGroup) {
haveInitialHeight := false
for {
cycleTimeStart := time.Now()
doneWithThisCycle := func() {
queryDuration := time.Since(cycleTimeStart)
if queryDuration < minQueryLoopDuration {
time.Sleep(minQueryLoopDuration - queryDuration)
}
}
latestHeightQueryCtx, latestHeightQueryCtxCancel := context.WithTimeout(ctx, heightQueryTimeout)
latestHeight, err := c.chain.ChainProvider.QueryLatestHeight(latestHeightQueryCtx)
latestHeightQueryCtxCancel()
if err != nil {
c.log.Warn("failed to query latest height", zap.String("chainID", c.chain.ChainID()), zap.Error(err))
doneWithThisCycle()
continue
}

c.latestHeight = latestHeight
// have non-zero height now for this chain ID, so caller can unblock
if !haveInitialHeight {
haveInitialHeight = true
wg.Done()
}

// TODO add packet commitment and ack queries
// limit to blocks since lastHeightQueried, except initial query should be full query
// call onNewUnrelayedSequences in goroutine for each c.Subscriber

c.lastHeightQueried = latestHeight

doneWithThisCycle()
}
}

func (ceb *ChainEventBus) GetLatestHeight(chainID string) (int64, error) {
chain, ok := ceb.chains[chainID]
if !ok {
return -1, fmt.Errorf("unable to get latest height, chain id does not exist in event bus: %s", chainID)
}
return chain.latestHeight, nil
}
40 changes: 28 additions & 12 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import (
)

// UnrelayedSequences returns the unrelayed sequence numbers between two chains
func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chantypes.IdentifiedChannel) (*RelaySequences, error) {
func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chantypes.IdentifiedChannel, eventBus *ChainEventBus) (*RelaySequences, error) {
var (
srcPacketSeq = []uint64{}
dstPacketSeq = []uint64{}
rs = &RelaySequences{Src: []uint64{}, Dst: []uint64{}}
)

srch, dsth, err := QueryLatestHeights(ctx, src, dst)
srch, err := eventBus.GetLatestHeight(src.ChainID())
if err != nil {
return nil, err
}
dsth, err := eventBus.GetLatestHeight(dst.ChainID())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,14 +148,18 @@ func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chanty
}

// UnrelayedAcknowledgements returns the unrelayed sequence numbers between two chains
func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel *chantypes.IdentifiedChannel) (*RelaySequences, error) {
func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel *chantypes.IdentifiedChannel, eventBus *ChainEventBus) (*RelaySequences, error) {
var (
srcPacketSeq = []uint64{}
dstPacketSeq = []uint64{}
rs = &RelaySequences{Src: []uint64{}, Dst: []uint64{}}
)

srch, dsth, err := QueryLatestHeights(ctx, src, dst)
srch, err := eventBus.GetLatestHeight(src.ChainID())
if err != nil {
return nil, err
}
dsth, err := eventBus.GetLatestHeight(dst.ChainID())
if err != nil {
return nil, err
}
Expand All @@ -174,7 +182,7 @@ func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel
return nil
}
}, retry.Context(egCtx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, _ = src.ChainProvider.QueryLatestHeight(egCtx)
srch, _ = eventBus.GetLatestHeight(src.ChainID())
})); err != nil {
return err
}
Expand All @@ -200,7 +208,7 @@ func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel
return nil
}
}, retry.Context(egCtx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
dsth, _ = dst.ChainProvider.QueryLatestHeight(egCtx)
dsth, _ = eventBus.GetLatestHeight(dst.ChainID())
})); err != nil {
return err
}
Expand All @@ -222,7 +230,7 @@ func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel
rs.Src, err = dst.ChainProvider.QueryUnreceivedAcknowledgements(egCtx, uint64(dsth), srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcPacketSeq)
return err
}, retry.Context(egCtx), RtyErr, RtyAtt, RtyDel, retry.OnRetry(func(n uint, err error) {
dsth, _ = dst.ChainProvider.QueryLatestHeight(egCtx)
dsth, _ = eventBus.GetLatestHeight(dst.ChainID())
}))
})

Expand All @@ -233,7 +241,7 @@ func UnrelayedAcknowledgements(ctx context.Context, src, dst *Chain, srcChannel
rs.Dst, err = src.ChainProvider.QueryUnreceivedAcknowledgements(egCtx, uint64(srch), srcChannel.ChannelId, srcChannel.PortId, dstPacketSeq)
return err
}, retry.Context(egCtx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, _ = src.ChainProvider.QueryLatestHeight(egCtx)
srch, _ = eventBus.GetLatestHeight(src.ChainID())
}))
})

Expand All @@ -258,7 +266,7 @@ func (rs *RelaySequences) Empty() bool {
}

// RelayAcknowledgements creates transactions to relay acknowledgements from src to dst and from dst to src
func RelayAcknowledgements(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
func RelayAcknowledgements(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel, eventBus *ChainEventBus) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Expand All @@ -271,7 +279,11 @@ func RelayAcknowledgements(ctx context.Context, log *zap.Logger, src, dst *Chain
case <-ctx.Done():
return ctx.Err()
default:
srch, dsth, err := QueryLatestHeights(ctx, src, dst)
srch, err := eventBus.GetLatestHeight(src.ChainID())
if err != nil {
return err
}
dsth, err := eventBus.GetLatestHeight(dst.ChainID())
if err != nil {
return err
}
Expand Down Expand Up @@ -383,7 +395,7 @@ func RelayAcknowledgements(ctx context.Context, log *zap.Logger, src, dst *Chain
}

// RelayPackets creates transactions to relay packets from src to dst and from dst to src
func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel, eventBus *ChainEventBus) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Expand All @@ -396,7 +408,11 @@ func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *Rel
case <-ctx.Done():
return ctx.Err()
default:
srch, dsth, err := QueryLatestHeights(ctx, src, dst)
srch, err := eventBus.GetLatestHeight(src.ChainID())
if err != nil {
return err
}
dsth, err := eventBus.GetLatestHeight(dst.ChainID())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 49600be

Please sign in to comment.