diff --git a/.env.example b/.env.example index ae28a47e..2f27f037 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,7 @@ RELAYER_TARGET_CHAIN_DEBUG=true RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_REGISTRY_ADDRESSES=neutron14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9s5c2epq +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/.env.example.dev b/.env.example.dev index c7a3e114..2cfc7bed 100644 --- a/.env.example.dev +++ b/.env.example.dev @@ -34,6 +34,7 @@ RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json RELAYER_TARGET_CHAIN_SIGN_MODE_STR=direct RELAYER_REGISTRY_ADDRESSES= +RELAYER_REGISTRY_QUERY_IDS= RELAYER_ALLOW_TX_QUERIES=true RELAYER_ALLOW_KV_CALLBACKS=true diff --git a/README.md b/README.md index 1d5b3972..f3e04f7f 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ Relayer: | `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional | | `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional | | `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required | +| `RELAYER_REGISTRY_QUERY_IDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out interchain queries being processed | optional | | `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required | | `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required | | `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional | diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 477a42f5..4a93f3c3 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -3,36 +3,53 @@ package registry // RegistryConfig represents the config structure for the Registry. type RegistryConfig struct { Addresses []string + QueryIDs []uint64 `envconfig:"QUERY_IDS"` } // New instantiates a new *Registry based on the cfg. func New(cfg *RegistryConfig) *Registry { r := &Registry{ addresses: make(map[string]struct{}, len(cfg.Addresses)), + queryIDs: make(map[uint64]struct{}, len(cfg.QueryIDs)), } for _, addr := range cfg.Addresses { r.addresses[addr] = struct{}{} } + for _, queryID := range cfg.QueryIDs { + r.queryIDs[queryID] = struct{}{} + } return r } -// Registry is the relayer's watch list registry. It contains a list of addresses, and the relayer -// only works with interchain queries that are under these addresses' ownership. +// Registry is the relayer's watch list registry. It contains a list of addresses and a list of queryIDs, +// and the relayer only works with interchain queries that are under these addresses' ownership and match the queryIDs. type Registry struct { addresses map[string]struct{} + queryIDs map[uint64]struct{} } -// IsEmpty returns true if the registry addresses list is empty. -func (r *Registry) IsEmpty() bool { +// IsAddressesEmpty returns true if the registry addresses list is empty. +func (r *Registry) IsAddressesEmpty() bool { return len(r.addresses) == 0 } -// Contains returns true if the addr is in the registry. -func (r *Registry) Contains(addr string) bool { +// IsQueryIDsEmpty returns true if the registry queryIDs list is empty. +func (r *Registry) IsQueryIDsEmpty() bool { + return len(r.queryIDs) == 0 +} + +// ContainsAddress returns true if the addr is in the registry. +func (r *Registry) ContainsAddress(addr string) bool { _, ex := r.addresses[addr] return ex } +// ContainsQueryID returns true if the queryID is in the registry. +func (r *Registry) ContainsQueryID(queryID uint64) bool { + _, ex := r.queryIDs[queryID] + return ex +} + func (r *Registry) GetAddresses() []string { var out []string for addr := range r.addresses { diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go new file mode 100644 index 00000000..298be1ac --- /dev/null +++ b/internal/registry/registry_test.go @@ -0,0 +1,59 @@ +package registry_test + +import ( + "testing" + + "github.com/neutron-org/neutron-query-relayer/internal/registry" + "github.com/stretchr/testify/assert" +) + +func TestRegistryWithEmptyAddressesAndEmptyQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{} + r := registry.New(&cfg) + assert.True(t, r.IsAddressesEmpty()) + assert.True(t, r.IsQueryIDsEmpty()) + + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.False(t, r.ContainsQueryID(0)) + assert.False(t, r.ContainsQueryID(1)) + assert.Equal(t, []string(nil), r.GetAddresses()) +} + +func TestRegistryWithAddressesAndEmptyQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + Addresses: []string{"exist_address", "exist_address2"}, + } + r := registry.New(&cfg) + assert.False(t, r.IsAddressesEmpty()) + assert.True(t, r.ContainsAddress("exist_address")) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.ElementsMatch(t, []string{"exist_address", "exist_address2"}, r.GetAddresses()) +} + +func TestRegistryWithAddressesAndQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + Addresses: []string{"exist_address", "exist_address2"}, + QueryIDs: []uint64{0, 1}, + } + r := registry.New(&cfg) + assert.False(t, r.IsAddressesEmpty()) + assert.False(t, r.IsQueryIDsEmpty()) + assert.True(t, r.ContainsAddress("exist_address")) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.True(t, r.ContainsQueryID(0)) + assert.True(t, r.ContainsQueryID(1)) + assert.False(t, r.ContainsQueryID(2)) +} + +func TestRegistryWithEmptyAddressesAndQueryIDs(t *testing.T) { + cfg := registry.RegistryConfig{ + QueryIDs: []uint64{0, 1}, + } + r := registry.New(&cfg) + assert.True(t, r.IsAddressesEmpty()) + assert.False(t, r.IsQueryIDsEmpty()) + assert.False(t, r.ContainsAddress("not_exist_address")) + assert.True(t, r.ContainsQueryID(0)) + assert.True(t, r.ContainsQueryID(1)) + assert.False(t, r.ContainsQueryID(2)) +} diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 79b6d2ec..aad5aede 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -3,6 +3,7 @@ package subscriber import ( "context" "fmt" + "strconv" "sync" "time" @@ -11,8 +12,6 @@ import ( "github.com/neutron-org/neutron-query-relayer/internal/config" "github.com/neutron-org/neutron-query-relayer/internal/relay" - "github.com/neutron-org/neutron-query-relayer/internal/registry" - instrumenters "github.com/neutron-org/neutron-query-relayer/internal/metrics" tmtypes "github.com/cometbft/cometbft/rpc/core/types" @@ -32,8 +31,8 @@ type Config struct { ConnectionID string // WatchedTypes is the list of query types to be observed and handled. WatchedTypes []neutrontypes.InterchainQueryType - // Registry is a watch list registry. It contains a list of addresses, and the Subscriber only - // works with interchain queries and events that are under these addresses' ownership. + // Registry is a watch list registry. It contains a list of addresses and a list of queryIDs, and the Subscriber only + // works with interchain queries and events that are under ownership of these addresses and match the queryIDs. Registry *rg.Registry } @@ -59,7 +58,7 @@ func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlo &Config{ ConnectionID: cfg.NeutronChain.ConnectionID, WatchedTypes: watchedMsgTypes, - Registry: registry.New(cfg.Registry), + Registry: rg.New(cfg.Registry), }, rpcClient, restClient.Query, @@ -217,6 +216,16 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul zap.String("query_id", queryID)) continue } + queryIDNumber, err := strconv.ParseUint(queryID, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse queryID: %w", err) + } + + if !s.isWatchedQueryID(queryIDNumber) { + s.logger.Debug("Skipping query (wrong queryID)", zap.String("owner", owner), + zap.String("query_id", queryID)) + continue + } // Load all information about the neutronQuery directly from Neutron. neutronQuery, err := s.getNeutronRegisteredQuery(ctx, queryID) diff --git a/internal/subscriber/subscriber_test.go b/internal/subscriber/subscriber_test.go index de8c53e2..6ba27715 100644 --- a/internal/subscriber/subscriber_test.go +++ b/internal/subscriber/subscriber_test.go @@ -3,6 +3,8 @@ package subscriber_test import ( "context" "fmt" + "sort" + "testing" ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/neutron-org/neutron-query-relayer/internal/registry" @@ -10,13 +12,10 @@ import ( "github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client/query" mock_subscriber "github.com/neutron-org/neutron-query-relayer/testutil/mocks/subscriber" neutrontypes "github.com/neutron-org/neutron/v4/x/interchainqueries/types" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "go.uber.org/zap" ) func TestDoneShouldEndSubscribe(t *testing.T) { @@ -149,3 +148,349 @@ func TestSubscribeContinuesOnMissingQuery(t *testing.T) { err = s.Subscribe(ctx, queriesTasksQueue) assert.Equal(t, err, nil) } + +func TestSubscribeWithEmptyQueryIDs(t *testing.T) { + // Create a new controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfgLogger := zap.NewProductionConfig() + logger, err := cfgLogger.Build() + require.NoError(t, err) + + rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl) + restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl) + + updateEvents := make(chan ctypes.ResultEvent) + blockEvents := make(chan ctypes.ResultEvent) + rpcClient.EXPECT().Start() + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(updateEvents, nil) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(blockEvents, nil) + + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{ + Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{ + NextKey: nil, + Total: "", + }, + RegisteredQueries: []*query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0{ + { + ID: "1", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "2", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, + }, nil) + + queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100) + cfg := subscriber.Config{ + ConnectionID: "", + WatchedTypes: []neutrontypes.InterchainQueryType{ + "kv", + "tx", + }, + Registry: registry.New(®istry.RegistryConfig{ + Addresses: make([]string, 0), + QueryIDs: make([]uint64, 0), // do not filter by query ID + }), + } + s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger) + assert.NoError(t, err) + + generateNewBlock := func() func() { + height := int64(1) + return func() { + rpcClient.EXPECT().Status(gomock.Any()).Return(&ctypes.ResultStatus{ + SyncInfo: ctypes.SyncInfo{ + LatestBlockHeight: height, + }, + }, nil) + + blockEvents <- ctypes.ResultEvent{} + height++ + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + { + // in this block we are going to check queriesTasksQueue are exactly 1 and 2 + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(2), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 3 is added to activeQueries + queryId := "3" + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQuery(&query.NeutronInterchainQueriesRegisteredQueryParams{ + QueryID: &queryId, + Context: ctx, + }).Return(&query.NeutronInterchainQueriesRegisteredQueryOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueryOKBody{ + RegisteredQuery: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQuery{ + ID: queryId, + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQueryLastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, nil) + events := make(map[string][]string) + events[subscriber.QueryIdAttr] = []string{queryId} + events[subscriber.ConnectionIdAttr] = []string{"kek"} + events[subscriber.KvKeyAttr] = []string{"kek"} + events[subscriber.TransactionsFilterAttr] = []string{"kek"} + events[subscriber.TypeAttr] = []string{"kek"} + events[subscriber.OwnerAttr] = []string{"owner"} + + updateEvents <- ctypes.ResultEvent{Events: events} + + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(2), queries[1].Id) + assert.Equal(t, uint64(3), queries[2].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + // should terminate Subscribe() function + cancel() + }() + + err = s.Subscribe(ctx, queriesTasksQueue) + assert.Equal(t, err, nil) +} + +func TestSubscribeWithSpecifiedQueryIDs(t *testing.T) { + // Create a new controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cfgLogger := zap.NewProductionConfig() + logger, err := cfgLogger.Build() + require.NoError(t, err) + + rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl) + restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl) + + updateEvents := make(chan ctypes.ResultEvent) + blockEvents := make(chan ctypes.ResultEvent) + rpcClient.EXPECT().Start() + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(updateEvents, nil) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(blockEvents, nil) + + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any()) + + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{ + Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{ + NextKey: nil, + Total: "", + }, + RegisteredQueries: []*query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0{ + { + ID: "1", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "2", + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + { + ID: "3", + Owner: "owner", + QueryType: "tx", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0LastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, + }, nil) + + queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100) + cfg := subscriber.Config{ + ConnectionID: "", + WatchedTypes: []neutrontypes.InterchainQueryType{ + "kv", + "tx", + }, + Registry: registry.New(®istry.RegistryConfig{ + Addresses: make([]string, 0), + QueryIDs: []uint64{1, 3, 5}, // We only handle queries which id equals 1, 3 or 5 + }), + } + s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger) + assert.NoError(t, err) + + generateNewBlock := func() func() { + height := int64(1) + return func() { + rpcClient.EXPECT().Status(gomock.Any()).Return(&ctypes.ResultStatus{ + SyncInfo: ctypes.SyncInfo{ + LatestBlockHeight: height, + }, + }, nil) + + blockEvents <- ctypes.ResultEvent{} + height++ + } + }() + emitUpdateEventForQueryID := func(queryID string) { + events := make(map[string][]string) + events[subscriber.QueryIdAttr] = []string{queryID} + events[subscriber.ConnectionIdAttr] = []string{"kek"} + events[subscriber.KvKeyAttr] = []string{"kek"} + events[subscriber.TransactionsFilterAttr] = []string{"kek"} + events[subscriber.TypeAttr] = []string{"kek"} + events[subscriber.OwnerAttr] = []string{"owner"} + updateEvents <- ctypes.ResultEvent{Events: events} + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + { + // in this block we are going to check queriesTasksQueue are exactly 1 and 3 + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 4 is dropped cause we have no intention to hand it + emitUpdateEventForQueryID("4") + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + { + // in this block we are going to confirm that query id 5 is added to activeQueries + queryId := "5" + restQuery.EXPECT().NeutronInterchainQueriesRegisteredQuery(&query.NeutronInterchainQueriesRegisteredQueryParams{ + QueryID: &queryId, + Context: ctx, + }).Return(&query.NeutronInterchainQueriesRegisteredQueryOK{ + Payload: &query.NeutronInterchainQueriesRegisteredQueryOKBody{ + RegisteredQuery: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQuery{ + ID: queryId, + Owner: "owner", + QueryType: "kv", + UpdatePeriod: "1", + LastSubmittedResultLocalHeight: "0", + LastSubmittedResultRemoteHeight: &query.NeutronInterchainQueriesRegisteredQueryOKBodyRegisteredQueryLastSubmittedResultRemoteHeight{ + RevisionHeight: "0", + RevisionNumber: "0", + }, + }, + }, + }, nil) + emitUpdateEventForQueryID(queryId) + + generateNewBlock() + queries := []neutrontypes.RegisteredQuery{ + <-queriesTasksQueue, + <-queriesTasksQueue, + <-queriesTasksQueue, + } + // reorder the queries by id, as Subscriber.activeQueries is a map, it does not guarantee the order of the iteration + sort.Slice(queries, func(i, j int) bool { + return queries[i].Id < queries[j].Id + }) + assert.Equal(t, uint64(1), queries[0].Id) + assert.Equal(t, uint64(3), queries[1].Id) + assert.Equal(t, uint64(5), queries[2].Id) + assert.Equal(t, 0, len(queriesTasksQueue)) + } + + // should terminate Subscribe() function + cancel() + }() + + err = s.Subscribe(ctx, queriesTasksQueue) + assert.Equal(t, err, nil) +} diff --git a/internal/subscriber/utils.go b/internal/subscriber/utils.go index 859978db..a444f151 100644 --- a/internal/subscriber/utils.go +++ b/internal/subscriber/utils.go @@ -71,7 +71,7 @@ func (s *Subscriber) getNeutronRegisteredQuery(ctx context.Context, queryId stri return neutronQuery, nil } -// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, and query type. +// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, query type, and queryID. func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[string]*neutrontypes.RegisteredQuery, error) { var out = map[string]*neutrontypes.RegisteredQuery{} var pageKey *strfmt.Base64 @@ -99,6 +99,9 @@ func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[strin if !s.isWatchedMsgType(neutronQuery.QueryType) { continue } + if !s.isWatchedQueryID(neutronQuery.Id) { + continue + } out[restQuery.ID] = neutronQuery } if payload.Pagination != nil && payload.Pagination.NextKey.String() != "" { @@ -172,8 +175,14 @@ func (s *Subscriber) isWatchedMsgType(msgType string) bool { return ex } +// isWatchedQueryID returns true if the queryID is within the registry watched queryIDs or there +// are no registry watched queryIDs configured for the subscriber meaning all queryIDs are watched. +func (s *Subscriber) isWatchedQueryID(queryID uint64) bool { + return s.registry.IsQueryIDsEmpty() || s.registry.ContainsQueryID(queryID) +} + // isWatchedAddress returns true if the address is within the registry watched addresses or there // are no registry watched addresses configured for the subscriber meaning all addresses are watched. func (s *Subscriber) isWatchedAddress(address string) bool { - return s.registry.IsEmpty() || s.registry.Contains(address) + return s.registry.IsAddressesEmpty() || s.registry.ContainsAddress(address) }