From 3bf5a75f094525820610948bb25eb2d3f4addf6b Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Fri, 1 Mar 2024 12:38:56 +0800 Subject: [PATCH 01/11] add RELAYER_REGISTRY_QUERY_IDS --- .env.example | 1 + .env.example.dev | 1 + internal/app/app.go | 13 +++++++------ internal/registry/registry.go | 1 + internal/subscriber/subscriber.go | 31 ++++++++++++++++++++++--------- internal/subscriber/utils.go | 13 +++++++++++++ 6 files changed, 45 insertions(+), 15 deletions(-) diff --git a/.env.example b/.env.example index ae28a47e..2f27f037 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,7 @@ RELAYER_TARGET_CHAIN_DEBUG=true RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_REGISTRY_ADDRESSES=neutron14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9s5c2epq +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/.env.example.dev b/.env.example.dev index c7a3e114..2cfc7bed 100644 --- a/.env.example.dev +++ b/.env.example.dev @@ -34,6 +34,7 @@ RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_TARGET_CHAIN_SIGN_MODE_STR=direct RELAYER_REGISTRY_ADDRESSES= +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/internal/app/app.go b/internal/app/app.go index 93d91fdf..0b52e9fd 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -64,12 +64,13 @@ func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlo subscriber, err := relaysubscriber.NewSubscriber( &subscriber.SubscriberConfig{ - RPCAddress: cfg.NeutronChain.RPCAddr, - RESTAddress: cfg.NeutronChain.RESTAddr, - Timeout: cfg.NeutronChain.Timeout, - ConnectionID: cfg.NeutronChain.ConnectionID, - WatchedTypes: watchedMsgTypes, - Registry: registry.New(cfg.Registry), + RPCAddress: cfg.NeutronChain.RPCAddr, + RESTAddress: cfg.NeutronChain.RESTAddr, + Timeout: cfg.NeutronChain.Timeout, + ConnectionID: cfg.NeutronChain.ConnectionID, + WatchedTypes: watchedMsgTypes, + WatchedQueryIDs: cfg.Registry.QueryIDS, + Registry: registry.New(cfg.Registry), }, logRegistry.Get(SubscriberContext), ) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 477a42f5..a8f254ee 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -3,6 +3,7 @@ package registry // RegistryConfig represents the config structure for the Registry. type RegistryConfig struct { Addresses []string + QueryIDS []uint64 } // New instantiates a new *Registry based on the cfg. diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 6a946288..c0631ad3 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -32,7 +32,8 @@ type SubscriberConfig struct { // ConnectionID is the Neutron's side connection ID used to filter out queries. ConnectionID string // WatchedTypes is the list of query types to be observed and handled. - WatchedTypes []neutrontypes.InterchainQueryType + WatchedTypes []neutrontypes.InterchainQueryType + WatchedQueryIDs []uint64 // Registry is a watch list registry. It contains a list of addresses, and the Subscriber only // works with interchain queries and events that are under these addresses' ownership. Registry *rg.Registry @@ -64,14 +65,20 @@ func NewSubscriber( watchedTypesMap[queryType] = struct{}{} } + watchedQueryIDsMap := make(map[uint64]struct{}) + for _, queryID := range cfg.WatchedQueryIDs { + watchedQueryIDsMap[queryID] = struct{}{} + } + return &Subscriber{ rpcClient: rpcClient, restClient: restClient, - connectionID: cfg.ConnectionID, - registry: cfg.Registry, - logger: logger, - watchedTypes: watchedTypesMap, + connectionID: cfg.ConnectionID, + registry: cfg.Registry, + logger: logger, + watchedTypes: watchedTypesMap, + watchedQueryIDs: watchedQueryIDsMap, activeQueries: map[string]*neutrontypes.RegisteredQuery{}, }, nil @@ -84,10 +91,11 @@ type Subscriber struct { rpcClient *http.HTTP // Used to subscribe to events restClient *restclient.HTTPAPIConsole // Used to run Neutron-specific queries using the REST - connectionID string - registry *rg.Registry - logger *zap.Logger - watchedTypes map[neutrontypes.InterchainQueryType]struct{} + connectionID string + registry *rg.Registry + logger *zap.Logger + watchedTypes map[neutrontypes.InterchainQueryType]struct{} + watchedQueryIDs map[uint64]struct{} activeQueries map[string]*neutrontypes.RegisteredQuery } @@ -205,6 +213,11 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul zap.String("query_id", queryID)) continue } + if !s.isWatchedQueryID(neutronQuery.Id) { + s.logger.Debug("Skipping query (wrong ID)", zap.String("owner", owner), + zap.String("query_id", queryID)) + continue + } // Save the updated query information to memory. s.activeQueries[queryID] = neutronQuery diff --git a/internal/subscriber/utils.go b/internal/subscriber/utils.go index 2bc7757e..b4379aef 100644 --- a/internal/subscriber/utils.go +++ b/internal/subscriber/utils.go @@ -95,6 +95,9 @@ func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[strin if !s.isWatchedMsgType(neutronQuery.QueryType) { continue } + if !s.isWatchedQueryID(neutronQuery.Id) { + continue + } out[restQuery.ID] = neutronQuery } if payload.Pagination != nil && payload.Pagination.NextKey.String() != "" { @@ -168,6 +171,16 @@ func (s *Subscriber) isWatchedMsgType(msgType string) bool { return ex } +// isWatchedQueryID returns true if the given message type was added to the subscriber's watched +// ActiveQuery IDs list. +func (s *Subscriber) isWatchedQueryID(queryID uint64) bool { + if len(s.watchedQueryIDs) == 0 { + return true + } + _, ex := s.watchedQueryIDs[queryID] + return ex +} + // isWatchedAddress returns true if the address is within the registry watched addresses or there // are no registry watched addresses configured for the subscriber meaning all addresses are watched. func (s *Subscriber) isWatchedAddress(address string) bool { From 6cd06a96a4ca5f37b8c735f568889de7293ff321 Mon Sep 17 00:00:00 2001 From: Chengbin Du Date: Thu, 14 Mar 2024 11:44:30 +0800 Subject: [PATCH 02/11] add comment --- README.md | 1 + internal/subscriber/subscriber.go | 5 ++++- internal/subscriber/utils.go | 5 +++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 13c4b8be..68eff700 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Relayer: | `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional | | `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional | | `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required | +| `RELAYER_REGISTRY_QUERY_IDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out messages being processed | optional | | `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required | | `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required | | `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional | diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index c0631ad3..1dd3128e 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -32,7 +32,9 @@ type SubscriberConfig struct { // ConnectionID is the Neutron's side connection ID used to filter out queries. ConnectionID string // WatchedTypes is the list of query types to be observed and handled. - WatchedTypes []neutrontypes.InterchainQueryType + WatchedTypes []neutrontypes.InterchainQueryType + // WatchedQueryIDs is the list of query IDs to be observed and handled: + // empty for all, or specific IDs only WatchedQueryIDs []uint64 // Registry is a watch list registry. It contains a list of addresses, and the Subscriber only // works with interchain queries and events that are under these addresses' ownership. @@ -65,6 +67,7 @@ func NewSubscriber( watchedTypesMap[queryType] = struct{}{} } + // Contains the query IDs of queries that we are ready to serve (KV / TX). watchedQueryIDsMap := make(map[uint64]struct{}) for _, queryID := range cfg.WatchedQueryIDs { watchedQueryIDsMap[queryID] = struct{}{} diff --git a/internal/subscriber/utils.go b/internal/subscriber/utils.go index b4379aef..882c1b89 100644 --- a/internal/subscriber/utils.go +++ b/internal/subscriber/utils.go @@ -171,9 +171,10 @@ func (s *Subscriber) isWatchedMsgType(msgType string) bool { return ex } -// isWatchedQueryID returns true if the given message type was added to the subscriber's watched -// ActiveQuery IDs list. +// isWatchedQueryID returns true if the given query ID was added to the subscriber's watched +// Query IDs list. func (s *Subscriber) isWatchedQueryID(queryID uint64) bool { + // Empty list represents that all query IDs will be processed. if len(s.watchedQueryIDs) == 0 { return true } From f823528276854273e8cea721401660c11dddf1d6 Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Fri, 15 Mar 2024 09:48:47 +0800 Subject: [PATCH 03/11] env example fix --- .env.example | 2 +- .env.example.dev | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index 2f27f037..6d265044 100644 --- a/.env.example +++ b/.env.example @@ -20,7 +20,7 @@ RELAYER_TARGET_CHAIN_DEBUG=true RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_REGISTRY_ADDRESSES=neutron14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9s5c2epq -RELAYER_REGISTRY_QUERY_IDS= +RELAYER_REGISTRY_QUERYIDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/.env.example.dev b/.env.example.dev index 2cfc7bed..ae1bb66b 100644 --- a/.env.example.dev +++ b/.env.example.dev @@ -34,7 +34,7 @@ RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_TARGET_CHAIN_SIGN_MODE_STR=direct RELAYER_REGISTRY_ADDRESSES= -RELAYER_REGISTRY_QUERY_IDS= +RELAYER_REGISTRY_QUERYIDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true From 26395bb01d3b6ae6fba52a917734a8077fbab9ad Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Fri, 15 Mar 2024 10:49:19 +0800 Subject: [PATCH 04/11] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 68eff700..b69759e7 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Relayer: | `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional | | `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional | | `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required | -| `RELAYER_REGISTRY_QUERY_IDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out messages being processed | optional | +| `RELAYER_REGISTRY_QUERYIDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out messages being processed | optional | | `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required | | `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required | | `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional | From 76c2577605040cba3dc183d8b95ecf9bd3b492d3 Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Tue, 26 Mar 2024 09:43:03 +0800 Subject: [PATCH 05/11] replace `messages` with `interchain queries` --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b69759e7..d9be0322 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Relayer: | `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional | | `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional | | `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required | -| `RELAYER_REGISTRY_QUERYIDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out messages being processed | optional | +| `RELAYER_REGISTRY_QUERYIDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out interchain queries being processed | optional | | `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required | | `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required | | `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional | From 1240635542d95a3a608937fcb7e53bc5bf361ce1 Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Tue, 26 Mar 2024 10:31:22 +0800 Subject: [PATCH 06/11] handle queryIDs similarly to addresses --- internal/app/app.go | 13 ++++----- internal/registry/registry.go | 22 +++++++++++++-- internal/subscriber/subscriber.go | 47 ++++++++++++++----------------- internal/subscriber/utils.go | 15 ++++------ 4 files changed, 51 insertions(+), 46 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 0b52e9fd..93d91fdf 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -64,13 +64,12 @@ func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlo subscriber, err := relaysubscriber.NewSubscriber( &subscriber.SubscriberConfig{ - RPCAddress: cfg.NeutronChain.RPCAddr, - RESTAddress: cfg.NeutronChain.RESTAddr, - Timeout: cfg.NeutronChain.Timeout, - ConnectionID: cfg.NeutronChain.ConnectionID, - WatchedTypes: watchedMsgTypes, - WatchedQueryIDs: cfg.Registry.QueryIDS, - Registry: registry.New(cfg.Registry), + RPCAddress: cfg.NeutronChain.RPCAddr, + RESTAddress: cfg.NeutronChain.RESTAddr, + Timeout: cfg.NeutronChain.Timeout, + ConnectionID: cfg.NeutronChain.ConnectionID, + WatchedTypes: watchedMsgTypes, + Registry: registry.New(cfg.Registry), }, logRegistry.Get(SubscriberContext), ) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index a8f254ee..72f50e14 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -10,10 +10,14 @@ type RegistryConfig struct { func New(cfg *RegistryConfig) *Registry { r := &Registry{ addresses: make(map[string]struct{}, len(cfg.Addresses)), + queryIDs: make(map[uint64]struct{}, len(cfg.QueryIDS)), } for _, addr := range cfg.Addresses { r.addresses[addr] = struct{}{} } + for _, queryID := range cfg.QueryIDS { + r.queryIDs[queryID] = struct{}{} + } return r } @@ -21,19 +25,31 @@ func New(cfg *RegistryConfig) *Registry { // only works with interchain queries that are under these addresses' ownership. type Registry struct { addresses map[string]struct{} + queryIDs map[uint64]struct{} } -// IsEmpty returns true if the registry addresses list is empty. -func (r *Registry) IsEmpty() bool { +// IsAddressesEmpty returns true if the registry addresses list is empty. +func (r *Registry) IsAddressesEmpty() bool { return len(r.addresses) == 0 } +// IsQueryIDsEmpty returns true if the registry queryIDs list is empty. +func (r *Registry) IsQueryIDsEmpty() bool { + return len(r.queryIDs) == 0 +} + // Contains returns true if the addr is in the registry. -func (r *Registry) Contains(addr string) bool { +func (r *Registry) ContainsAddress(addr string) bool { _, ex := r.addresses[addr] return ex } +// Contains returns true if the queryID is in the registry. +func (r *Registry) ContainsQueryID(queryID uint64) bool { + _, ex := r.queryIDs[queryID] + return ex +} + func (r *Registry) GetAddresses() []string { var out []string for addr := range r.addresses { diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 1dd3128e..6dfebe78 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -3,6 +3,7 @@ package subscriber import ( "context" "fmt" + "strconv" "sync" "time" @@ -33,11 +34,8 @@ type SubscriberConfig struct { ConnectionID string // WatchedTypes is the list of query types to be observed and handled. WatchedTypes []neutrontypes.InterchainQueryType - // WatchedQueryIDs is the list of query IDs to be observed and handled: - // empty for all, or specific IDs only - WatchedQueryIDs []uint64 - // Registry is a watch list registry. It contains a list of addresses, and the Subscriber only - // works with interchain queries and events that are under these addresses' ownership. + // Registry is a watch list registry. It contains a list of addresses and queryIDs, and the Subscriber only + // works with interchain queries and events that are under ownership of these addresses and queryIDs. Registry *rg.Registry } @@ -67,21 +65,14 @@ func NewSubscriber( watchedTypesMap[queryType] = struct{}{} } - // Contains the query IDs of queries that we are ready to serve (KV / TX). - watchedQueryIDsMap := make(map[uint64]struct{}) - for _, queryID := range cfg.WatchedQueryIDs { - watchedQueryIDsMap[queryID] = struct{}{} - } - return &Subscriber{ rpcClient: rpcClient, restClient: restClient, - connectionID: cfg.ConnectionID, - registry: cfg.Registry, - logger: logger, - watchedTypes: watchedTypesMap, - watchedQueryIDs: watchedQueryIDsMap, + connectionID: cfg.ConnectionID, + registry: cfg.Registry, + logger: logger, + watchedTypes: watchedTypesMap, activeQueries: map[string]*neutrontypes.RegisteredQuery{}, }, nil @@ -94,11 +85,10 @@ type Subscriber struct { rpcClient *http.HTTP // Used to subscribe to events restClient *restclient.HTTPAPIConsole // Used to run Neutron-specific queries using the REST - connectionID string - registry *rg.Registry - logger *zap.Logger - watchedTypes map[neutrontypes.InterchainQueryType]struct{} - watchedQueryIDs map[uint64]struct{} + connectionID string + registry *rg.Registry + logger *zap.Logger + watchedTypes map[neutrontypes.InterchainQueryType]struct{} activeQueries map[string]*neutrontypes.RegisteredQuery } @@ -204,6 +194,16 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul zap.String("query_id", queryID)) continue } + queryIDNumber, err := strconv.ParseUint(queryID, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse queryID: %w", err) + } + + if !s.isWatchedQueryID(queryIDNumber) { + s.logger.Debug("Skipping query (wrong ID)", zap.String("owner", owner), + zap.String("query_id", queryID)) + continue + } // Load all information about the neutronQuery directly from Neutron. neutronQuery, err := s.getNeutronRegisteredQuery(ctx, queryID) @@ -216,11 +216,6 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul zap.String("query_id", queryID)) continue } - if !s.isWatchedQueryID(neutronQuery.Id) { - s.logger.Debug("Skipping query (wrong ID)", zap.String("owner", owner), - zap.String("query_id", queryID)) - continue - } // Save the updated query information to memory. s.activeQueries[queryID] = neutronQuery diff --git a/internal/subscriber/utils.go b/internal/subscriber/utils.go index 882c1b89..c47dc34e 100644 --- a/internal/subscriber/utils.go +++ b/internal/subscriber/utils.go @@ -67,7 +67,7 @@ func (s *Subscriber) getNeutronRegisteredQuery(ctx context.Context, queryId stri return neutronQuery, nil } -// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, and query type. +// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, query type, and queryID. func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[string]*neutrontypes.RegisteredQuery, error) { var out = map[string]*neutrontypes.RegisteredQuery{} var pageKey *strfmt.Base64 @@ -171,19 +171,14 @@ func (s *Subscriber) isWatchedMsgType(msgType string) bool { return ex } -// isWatchedQueryID returns true if the given query ID was added to the subscriber's watched -// Query IDs list. +// isWatchedQueryID returns true if the queryID is within the registry watched queryIDs or there +// are no registry watched queryIDs configured for the subscriber meaning all queryIDs are watched. func (s *Subscriber) isWatchedQueryID(queryID uint64) bool { - // Empty list represents that all query IDs will be processed. - if len(s.watchedQueryIDs) == 0 { - return true - } - _, ex := s.watchedQueryIDs[queryID] - return ex + return s.registry.IsQueryIDsEmpty() || s.registry.ContainsQueryID(queryID) } // isWatchedAddress returns true if the address is within the registry watched addresses or there // are no registry watched addresses configured for the subscriber meaning all addresses are watched. func (s *Subscriber) isWatchedAddress(address string) bool { - return s.registry.IsEmpty() || s.registry.Contains(address) + return s.registry.IsAddressesEmpty() || s.registry.ContainsAddress(address) } From 05142eecd94b0a5699efff2da59d8676438684a5 Mon Sep 17 00:00:00 2001 From: tpkeeper Date: Tue, 26 Mar 2024 10:38:34 +0800 Subject: [PATCH 07/11] replace RELAYER_REGISTRY_QUERYIDS with RELAYER_REGISTRY_QUERY_IDS --- .env.example | 2 +- .env.example.dev | 2 +- README.md | 2 +- internal/registry/registry.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.env.example b/.env.example index 6d265044..2f27f037 100644 --- a/.env.example +++ b/.env.example @@ -20,7 +20,7 @@ RELAYER_TARGET_CHAIN_DEBUG=true RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_REGISTRY_ADDRESSES=neutron14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9s5c2epq -RELAYER_REGISTRY_QUERYIDS= +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/.env.example.dev b/.env.example.dev index ae1bb66b..2cfc7bed 100644 --- a/.env.example.dev +++ b/.env.example.dev @@ -34,7 +34,7 @@ RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_TARGET_CHAIN_SIGN_MODE_STR=direct RELAYER_REGISTRY_ADDRESSES= -RELAYER_REGISTRY_QUERYIDS= +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/README.md b/README.md index d9be0322..d4aa01fa 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Relayer: | `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional | | `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional | | `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required | -| `RELAYER_REGISTRY_QUERYIDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out interchain queries being processed | optional | +| `RELAYER_REGISTRY_QUERY_IDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out interchain queries being processed | optional | | `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required | | `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required | | `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional | diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 72f50e14..59d855e6 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -3,19 +3,19 @@ package registry // RegistryConfig represents the config structure for the Registry. type RegistryConfig struct { Addresses []string - QueryIDS []uint64 + QueryIDs []uint64 `envconfig:"QUERY_IDS"` } // New instantiates a new *Registry based on the cfg. func New(cfg *RegistryConfig) *Registry { r := &Registry{ addresses: make(map[string]struct{}, len(cfg.Addresses)), - queryIDs: make(map[uint64]struct{}, len(cfg.QueryIDS)), + queryIDs: make(map[uint64]struct{}, len(cfg.QueryIDs)), } for _, addr := range cfg.Addresses { r.addresses[addr] = struct{}{} } - for _, queryID := range cfg.QueryIDS { + for _, queryID := range cfg.QueryIDs { r.queryIDs[queryID] = struct{}{} } return r From 8df9afcd0985a0cfd698e76426f26f0867db6025 Mon Sep 17 00:00:00 2001 From: Chengbin Du Date: Tue, 26 Mar 2024 11:12:57 +0800 Subject: [PATCH 08/11] update comemnts --- internal/registry/registry.go | 8 ++++---- internal/subscriber/subscriber.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 59d855e6..4a93f3c3 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -21,8 +21,8 @@ func New(cfg *RegistryConfig) *Registry { return r } -// Registry is the relayer's watch list registry. It contains a list of addresses, and the relayer -// only works with interchain queries that are under these addresses' ownership. +// Registry is the relayer's watch list registry. It contains a list of addresses and a list of queryIDs, +// and the relayer only works with interchain queries that are under these addresses' ownership and match the queryIDs. type Registry struct { addresses map[string]struct{} queryIDs map[uint64]struct{} @@ -38,13 +38,13 @@ func (r *Registry) IsQueryIDsEmpty() bool { return len(r.queryIDs) == 0 } -// Contains returns true if the addr is in the registry. +// ContainsAddress returns true if the addr is in the registry. func (r *Registry) ContainsAddress(addr string) bool { _, ex := r.addresses[addr] return ex } -// Contains returns true if the queryID is in the registry. +// ContainsQueryID returns true if the queryID is in the registry. func (r *Registry) ContainsQueryID(queryID uint64) bool { _, ex := r.queryIDs[queryID] return ex diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 6dfebe78..5f748ebd 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -34,8 +34,8 @@ type SubscriberConfig struct { ConnectionID string // WatchedTypes is the list of query types to be observed and handled. WatchedTypes []neutrontypes.InterchainQueryType - // Registry is a watch list registry. It contains a list of addresses and queryIDs, and the Subscriber only - // works with interchain queries and events that are under ownership of these addresses and queryIDs. + // Registry is a watch list registry. It contains a list of addresses and a list of queryIDs, and the Subscriber only + // works with interchain queries and events that are under ownership of these addresses and match the queryIDs. Registry *rg.Registry } From 50bbfcfc1c0dcc3e5d4184c1513c95fbb8a10c24 Mon Sep 17 00:00:00 2001 From: Chengbin Du Date: Tue, 26 Mar 2024 11:13:28 +0800 Subject: [PATCH 09/11] update log message --- internal/subscriber/subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 5f748ebd..8e297beb 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -200,7 +200,7 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul } if !s.isWatchedQueryID(queryIDNumber) { - s.logger.Debug("Skipping query (wrong ID)", zap.String("owner", owner), + s.logger.Debug("Skipping query (wrong queryID)", zap.String("owner", owner), zap.String("query_id", queryID)) continue } From a5bd1ebcbc7bef3aad21033c5ad6dae0168efeae Mon Sep 17 00:00:00 2001 From: Chengbin Du Date: Tue, 16 Apr 2024 18:16:09 +0800 Subject: [PATCH 10/11] add unit test for registry --- internal/registry/registry_test.go | 59 ++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 internal/registry/registry_test.go diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go new file mode 100644 index 00000000..298be1ac --- /dev/null +++ b/internal/registry/registry_test.go @@ -0,0 +1,59 @@ +package registry_test + +import ( + "testing" + + "github.com/neutron-org/neutron-query-relayer/internal/registry" + "github.com/stretchr/testify/assert" +) + +func TestRegistryWithEmptyAddressesAndEmptyQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{} + r := registry.New(&cfg) + assert.True(t, r.IsAddressesEmpty()) + assert.True(t, r.IsQueryIDsEmpty()) + + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.False(t, r.ContainsQueryID(0)) + assert.False(t, r.ContainsQueryID(1)) + assert.Equal(t, []string(nil), r.GetAddresses()) +} + +func TestRegistryWithAddressesAndEmptyQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + Addresses: []string{"exist_address", "exist_address2"}, + } + r := registry.New(&cfg) + assert.False(t, r.IsAddressesEmpty()) + assert.True(t, r.ContainsAddress("exist_address")) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.ElementsMatch(t, []string{"exist_address", "exist_address2"}, r.GetAddresses()) +} + +func TestRegistryWithAddressesAndQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + Addresses: []string{"exist_address", "exist_address2"}, + QueryIDs: []uint64{0, 1}, + } + r := registry.New(&cfg) + assert.False(t, r.IsAddressesEmpty()) + assert.False(t, r.IsQueryIDsEmpty()) + assert.True(t, r.ContainsAddress("exist_address")) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.True(t, r.ContainsQueryID(0)) + assert.True(t, r.ContainsQueryID(1)) + assert.False(t, r.ContainsQueryID(2)) +} + +func TestRegistryWithEmptyAddressesAndQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + QueryIDs: []uint64{0, 1}, + } + r := registry.New(&cfg) + assert.True(t, r.IsAddressesEmpty()) + assert.False(t, r.IsQueryIDsEmpty()) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.True(t, r.ContainsQueryID(0)) + assert.True(t, r.ContainsQueryID(1)) + assert.False(t, r.ContainsQueryID(2)) +} From 2070b36f4be1ac5be9c5b067fbe6d2a96875bcc9 Mon Sep 17 00:00:00 2001 From: Chengbin Du Date: Tue, 16 Apr 2024 18:17:21 +0800 Subject: [PATCH 11/11] add unit test for query ID filter --- internal/subscriber/subscriber_test.go | 355 ++++++++++++++++++++++++- 1 file changed, 351 insertions(+), 4 deletions(-) diff --git a/internal/subscriber/subscriber_test.go b/internal/subscriber/subscriber_test.go index 2b2b7e4b..7c4d0007 100644 --- a/internal/subscriber/subscriber_test.go +++ b/internal/subscriber/subscriber_test.go @@ -3,18 +3,19 @@ package subscriber_test import ( "context" "fmt" + "sort" + "testing" + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/neutron-org/neutron-query-relayer/internal/registry" "github.com/neutron-org/neutron-query-relayer/internal/subscriber" "github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client/query" mock_subscriber "github.com/neutron-org/neutron-query-relayer/testutil/mocks/subscriber" neutrontypes "github.com/neutron-org/neutron/x/interchainqueries/types" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "testing" + "go.uber.org/zap" ) func TestDoneShouldEndSubscribe(t *testing.T) { @@ -147,3 +148,349 @@ func TestSubscribeContinuesOnMissingQuery(t *testing.T) { err = s.Subscribe(ctx, queriesTasksQueue) assert.Equal(t, err, nil) } + +func TestSubscribeWithEmptyQueryIDs(t *testing.T) { + // Create a new controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfgLogger := zap.NewProductionConfig() + logger, err := cfgLogger.Build() + require.NoError(t, err) + + rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl) + restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl) + + updateEvents := make(chan ctypes.ResultEvent) + blockEvents := make(chan ctypes.ResultEvent) + rpcClient.EXPECT().Start() + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(updateEvents, nil) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(blockEvents, nil) + + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{ + Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{ + NextKey: nil, + Total: "", + }, + RegisteredQueries: []*query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0{ + { + ID: "1", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "2", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, + }, nil) + + queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100) + cfg := subscriber.Config{ + ConnectionID: "", + WatchedTypes: []neutrontypes.InterchainQueryType{ + "kv", + "tx", + }, + Registry: registry.New(®istry.RegistryConfig{ + Addresses: make([]string, 0), + QueryIDs: make([]uint64, 0), // do not filter by query ID + }), + } + s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger) + assert.NoError(t, err) + + generateNewBlock := func() func() { + height := int64(1) + return func() { + rpcClient.EXPECT().Status(gomock.Any()).Return(&ctypes.ResultStatus{ + SyncInfo: ctypes.SyncInfo{ + LatestBlockHeight: height, + }, + }, nil) + + blockEvents <- ctypes.ResultEvent{} + height++ + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + { + // in this block we are going to check queriesTasksQueue are exactly 1 and 2 + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(2), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 3 is added to activeQueries + queryId := "3" + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQuery(&query.NeutronInterchainQueriesRegisteredQueryParams{ + QueryID: &queryId, + Context: ctx, + }).Return(&query.NeutronInterchainQueriesRegisteredQueryOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueryOKBody{ + RegisteredQuery: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQuery{ + ID: queryId, + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQueryLastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, nil) + events := make(map[string][]string) + events[subscriber.QueryIdAttr] = []string{queryId} + events[subscriber.ConnectionIdAttr] = []string{"kek"} + events[subscriber.KvKeyAttr] = []string{"kek"} + events[subscriber.TransactionsFilterAttr] = []string{"kek"} + events[subscriber.TypeAttr] = []string{"kek"} + events[subscriber.OwnerAttr] = []string{"owner"} + + updateEvents <- ctypes.ResultEvent{Events: events} + + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(2), queries[1].Id) + assert.Equal(t, uint64(3), queries[2].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + // should terminate Subscribe() function + cancel() + }() + + err = s.Subscribe(ctx, queriesTasksQueue) + assert.Equal(t, err, nil) +} + +func TestSubscribeWithSpecifiedQueryIDs(t *testing.T) { + // Create a new controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfgLogger := zap.NewProductionConfig() + logger, err := cfgLogger.Build() + require.NoError(t, err) + + rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl) + restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl) + + updateEvents := make(chan ctypes.ResultEvent) + blockEvents := make(chan ctypes.ResultEvent) + rpcClient.EXPECT().Start() + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(updateEvents, nil) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(blockEvents, nil) + + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{ + Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{ + NextKey: nil, + Total: "", + }, + RegisteredQueries: []*query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0{ + { + ID: "1", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "2", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "3", + Owner: "owner", + QueryType: "tx", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, + }, nil) + + queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100) + cfg := subscriber.Config{ + ConnectionID: "", + WatchedTypes: []neutrontypes.InterchainQueryType{ + "kv", + "tx", + }, + Registry: registry.New(®istry.RegistryConfig{ + Addresses: make([]string, 0), + QueryIDs: []uint64{1, 3, 5}, // We only handle queries which id equals 1, 3 or 5 + }), + } + s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger) + assert.NoError(t, err) + + generateNewBlock := func() func() { + height := int64(1) + return func() { + rpcClient.EXPECT().Status(gomock.Any()).Return(&ctypes.ResultStatus{ + SyncInfo: ctypes.SyncInfo{ + LatestBlockHeight: height, + }, + }, nil) + + blockEvents <- ctypes.ResultEvent{} + height++ + } + }() + emitUpdateEventForQueryID := func(queryID string) { + events := make(map[string][]string) + events[subscriber.QueryIdAttr] = []string{queryID} + events[subscriber.ConnectionIdAttr] = []string{"kek"} + events[subscriber.KvKeyAttr] = []string{"kek"} + events[subscriber.TransactionsFilterAttr] = []string{"kek"} + events[subscriber.TypeAttr] = []string{"kek"} + events[subscriber.OwnerAttr] = []string{"owner"} + updateEvents <- ctypes.ResultEvent{Events: events} + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + { + // in this block we are going to check queriesTasksQueue are exactly 1 and 3 + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 4 is dropped cause we have no intention to hand it + emitUpdateEventForQueryID("4") + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 5 is added to activeQueries + queryId := "5" + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQuery(&query.NeutronInterchainQueriesRegisteredQueryParams{ + QueryID: &queryId, + Context: ctx, + }).Return(&query.NeutronInterchainQueriesRegisteredQueryOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueryOKBody{ + RegisteredQuery: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQuery{ + ID: queryId, + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQueryLastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, nil) + emitUpdateEventForQueryID(queryId) + + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, uint64(5), queries[2].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + // should terminate Subscribe() function + cancel() + }() + + err = s.Subscribe(ctx, queriesTasksQueue) + assert.Equal(t, err, nil) +}