diff --git a/cmd/start.go b/cmd/start.go index f383e27f8c3..a194267f556 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -27,9 +27,10 @@ import ( "time" "github.com/avast/retry-go" + "github.com/spf13/viper" + "github.com/cosmos/relayer/relayer" "github.com/spf13/cobra" - "github.com/spf13/viper" "golang.org/x/sync/errgroup" ) diff --git a/dev-env b/dev-env index f5d15fbf134..ab78f3a2508 100755 --- a/dev-env +++ b/dev-env @@ -21,9 +21,15 @@ echo "waiting for blocks..." sleep 3 rly tx link demo -d -o 3s +sleep 2 +echo "Initial balances" +echo "balance 0 $(rly q bal ibc-0)" +echo "balance 1 $(rly q bal ibc-1)" rly tx transfer ibc-0 ibc-1 100000samoleans "$(rly keys show ibc-1)" -d rly tx transfer ibc-1 ibc-0 100000samoleans "$(rly keys show ibc-0)" -d sleep 2 -rly tx relay-packets demo -d +rly start demo -d sleep 2 -rly tx relay-acknowledgements demo -d \ No newline at end of file +echo "Balances after packets are sent" +echo "balance 0 $(rly q bal ibc-0)" +echo "balance 1 $(rly q bal ibc-1)" \ No newline at end of file diff --git a/relayer/naive-strategy.go b/relayer/naive-strategy.go index 2f3fc07116f..2e740353a0c 100644 --- a/relayer/naive-strategy.go +++ b/relayer/naive-strategy.go @@ -335,51 +335,13 @@ func RelayPackets(src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength u } // add messages for sequences on src - for _, seq := range sp.Src { - // Query src for the sequence number to get type of packet - var recvMsg, timeoutMsg provider.RelayerMessage - if err = retry.Do(func() error { - recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dst.PathEnd.ChannelID, dst.PathEnd.PortID, src.PathEnd.ChannelID, src.PathEnd.PortID, src.PathEnd.ClientID) - return err - }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { - srch, dsth, _ = QueryLatestHeights(src, dst) - })); err != nil { - return err - } - - // depending on the type of message to be relayed, we need to - // send to different chains - if recvMsg != nil { - msgs.Dst = append(msgs.Dst, recvMsg) - } - - if timeoutMsg != nil { - msgs.Src = append(msgs.Src, timeoutMsg) - } + if err = AddMessagesForSequences(sp.Src, src, dst, srch, dsth, &msgs.Src, &msgs.Dst); err != nil { + return err } // add messages for sequences on dst - for _, seq := range sp.Dst { - // Query dst for the sequence number to get type of packet - var recvMsg, timeoutMsg provider.RelayerMessage - if err = retry.Do(func() error { - recvMsg, timeoutMsg, err = dst.ChainProvider.RelayPacketFromSequence(dst.ChainProvider, src.ChainProvider, uint64(dsth), uint64(srch), seq, src.PathEnd.ChannelID, src.PathEnd.PortID, dst.PathEnd.ChannelID, dst.PathEnd.PortID, dst.PathEnd.ClientID) - return nil - }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { - srch, dsth, _ = QueryLatestHeights(src, dst) - })); err != nil { - return err - } - - // depending on the type of message to be relayed, we need to - // send to different chains - if recvMsg != nil { - msgs.Src = append(msgs.Src, recvMsg) - } - - if timeoutMsg != nil { - msgs.Dst = append(msgs.Dst, timeoutMsg) - } + if err = AddMessagesForSequences(sp.Dst, dst, src, dsth, srch, &msgs.Dst, &msgs.Src); err != nil { + return err } if !msgs.Ready() { @@ -389,77 +351,96 @@ func RelayPackets(src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength u } // Prepend non-empty msg lists with UpdateClient - if len(msgs.Dst) != 0 { + eg := new(errgroup.Group) + + eg.Go(func() error { + return PrependUpdateClientMsg(&msgs.Dst, src, dst, srch) + }) + + eg.Go(func() error { + return PrependUpdateClientMsg(&msgs.Src, dst, src, dsth) + }) + + if err = eg.Wait(); err != nil { + return err + } + + // send messages to their respective chains + if msgs.Send(src, dst); msgs.Success() { + if len(msgs.Dst) > 1 { + dst.logPacketsRelayed(src, len(msgs.Dst)-1) + } + if len(msgs.Src) > 1 { + src.logPacketsRelayed(dst, len(msgs.Src)-1) + } + } + + return nil +} + +// AddMessagesForSequences constructs RecvMsgs and TimeoutMsgs from sequence numbers on a src chain +// and adds them to the appropriate queue of msgs for both src and dst +func AddMessagesForSequences(sequences []uint64, src, dst *Chain, srch, dsth int64, srcMsgs, dstMsgs *[]provider.RelayerMessage) error { + for _, seq := range sequences { + var ( - srcHeader ibcexported.Header - updateMsg provider.RelayerMessage + recvMsg, timeoutMsg provider.RelayerMessage + err error ) + // Query src for the sequence number to get type of packet if err = retry.Do(func() error { - srcHeader, err = src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID) + recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dst.PathEnd.ChannelID, dst.PathEnd.PortID, src.PathEnd.ChannelID, src.PathEnd.PortID, src.PathEnd.ClientID) return err }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { - srch, _, _ = QueryLatestHeights(src, dst) + srch, dsth, _ = QueryLatestHeights(src, dst) })); err != nil { return err } - if err = retry.Do(func() error { - updateMsg, err = dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader) - return nil - }, RtyAtt, RtyDel, RtyErr); err != nil { - return err + // Depending on the type of message to be relayed, we need to send to different chains + if recvMsg != nil { + *dstMsgs = append(*dstMsgs, recvMsg) } - msgs.Dst = append([]provider.RelayerMessage{updateMsg}, msgs.Dst...) + if timeoutMsg != nil { + *srcMsgs = append(*srcMsgs, timeoutMsg) + } } - if len(msgs.Src) != 0 { - //dstHeader, err := dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID) - //if err != nil { - // return err - //} - //updateMsg, err := src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader) - //if err != nil { - // return err - //} + return nil +} +// PrependUpdateClientMsg adds an UpdateClient msg to the front of non-empty msg lists +func PrependUpdateClientMsg(msgs *[]provider.RelayerMessage, src, dst *Chain, srch int64) error { + if len(*msgs) != 0 { var ( - dstHeader ibcexported.Header + srcHeader ibcexported.Header updateMsg provider.RelayerMessage + err error ) + // Query IBC Update Header if err = retry.Do(func() error { - dstHeader, err = dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID) + srcHeader, err = src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID) return err }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { - _, dsth, _ = QueryLatestHeights(src, dst) + srch, _, _ = QueryLatestHeights(src, dst) })); err != nil { return err } + // Construct UpdateClient msg if err = retry.Do(func() error { - updateMsg, err = src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader) + updateMsg, err = dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader) return nil }, RtyAtt, RtyDel, RtyErr); err != nil { return err } - msgs.Src = append([]provider.RelayerMessage{updateMsg}, msgs.Src...) + // Prepend UpdateClient msg to the slice of msgs + *msgs = append([]provider.RelayerMessage{updateMsg}, *msgs...) } - - // send messages to their respective chains - if msgs.Send(src, dst); msgs.Success() { - if len(msgs.Dst) > 1 { - dst.logPacketsRelayed(src, len(msgs.Dst)-1) - } - if len(msgs.Src) > 1 { - src.logPacketsRelayed(dst, len(msgs.Src)-1) - } - } else { - fmt.Println() - } - return nil } diff --git a/relayer/strategies.go b/relayer/strategies.go index 479c47a13dd..96f5b2f0c73 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -2,6 +2,7 @@ package relayer import ( "fmt" + "time" ) // StartRelayer starts the main relaying loop @@ -49,7 +50,7 @@ func StartRelayer(src, dst *Chain, maxTxSize, maxMsgLength uint64) (func(), erro } } - //time.Sleep(100 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } } }()