Skip to content

Commit

Permalink
unit test to check that we do not end subscribe prematurely
Browse files Browse the repository at this point in the history
  • Loading branch information
NeverHappened committed Mar 15, 2024
1 parent 86adab6 commit 7197268
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 36 deletions.
24 changes: 12 additions & 12 deletions internal/subscriber/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ const (
// eventAttr is the key of the tendermint's event attribute that contains the kind of event.
eventAttr = "tm.event"

// connectionIdAttr is the key of the Neutron's custom message event's attribute that contains the
// ConnectionIdAttr is the key of the Neutron's custom message event's attribute that contains the
// connectionID of the event's ActiveQuery.
connectionIdAttr = eventTypePrefix + "." + types.AttributeKeyConnectionID
// queryIdAttr is the key of the Neutron's custom message event's attribute that contains the
ConnectionIdAttr = eventTypePrefix + "." + types.AttributeKeyConnectionID
// QueryIdAttr is the key of the Neutron's custom message event's attribute that contains the
// incoming ICQ ID.
queryIdAttr = eventTypePrefix + "." + types.AttributeKeyQueryID
// kvKeyAttr is the key of the Neutron's custom message event's attribute that contains the KV
QueryIdAttr = eventTypePrefix + "." + types.AttributeKeyQueryID
// KvKeyAttr is the key of the Neutron's custom message event's attribute that contains the KV
// values for the incoming KV ICQ.
kvKeyAttr = eventTypePrefix + "." + types.AttributeKeyKVQuery
// transactionsFilterAttr is the key of the Neutron's custom message event's attribute that
KvKeyAttr = eventTypePrefix + "." + types.AttributeKeyKVQuery
// TransactionsFilterAttr is the key of the Neutron's custom message event's attribute that
// contains the transaction filter value for the incoming TX ICQ.
transactionsFilterAttr = eventTypePrefix + "." + types.AttributeTransactionsFilterQuery
// typeAttr is the key of the Neutron's custom message event's attribute that contains the type
TransactionsFilterAttr = eventTypePrefix + "." + types.AttributeTransactionsFilterQuery
// TypeAttr is the key of the Neutron's custom message event's attribute that contains the type
// of the incoming ICQ.
typeAttr = eventTypePrefix + "." + types.AttributeKeyQueryType
// ownerAttr is the key of the Neutron's custom message event's attribute that contains the
TypeAttr = eventTypePrefix + "." + types.AttributeKeyQueryType
// OwnerAttr is the key of the Neutron's custom message event's attribute that contains the
// address of the ICQ owner.
ownerAttr = eventTypePrefix + "." + types.AttributeKeyOwner
OwnerAttr = eventTypePrefix + "." + types.AttributeKeyOwner
)
19 changes: 5 additions & 14 deletions internal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ var (

// Config contains configurable fields for the Subscriber.
type Config struct {
//// RPCAddress represents the address for RPC calls to the chain.
//RPCAddress string
//// RESTAddress represents the address for REST calls to the chain.
//RESTAddress string
//// Timeout defines time limit for requests executed by the Subscriber.
//Timeout time.Duration
// ConnectionID is the Neutron's side connection ID used to filter out queries.
ConnectionID string
// WatchedTypes is the list of query types to be observed and handled.
Expand Down Expand Up @@ -62,9 +56,6 @@ func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlo

sub, err := NewSubscriber(
&Config{
//RPCAddress: cfg.NeutronChain.RPCAddr,
//RESTAddress: cfg.NeutronChain.RESTAddr,
//Timeout: cfg.NeutronChain.Timeout,
ConnectionID: cfg.NeutronChain.ConnectionID,
WatchedTypes: watchedMsgTypes,
Registry: registry.New(cfg.Registry),
Expand Down Expand Up @@ -215,10 +206,10 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul
// There can be multiple events of the same type associated with our connection id in a
// single tmtypes.ResultEvent value. We need to process all of them.
var events = event.Events
for idx := range events[connectionIdAttr] {
for idx := range events[ConnectionIdAttr] {
var (
owner = events[ownerAttr][idx]
queryID = events[queryIdAttr][idx]
owner = events[OwnerAttr][idx]
queryID = events[QueryIdAttr][idx]
)
if !s.isWatchedAddress(owner) {
s.logger.Debug("Skipping query (wrong owner)", zap.String("owner", owner),
Expand Down Expand Up @@ -261,9 +252,9 @@ func (s *Subscriber) processRemoveEvent(event tmtypes.ResultEvent) error {
// There can be multiple events of the same type associated with our connection id in a
// single tmtypes.ResultEvent value. We need to process all of them.
var events = event.Events
for idx := range events[connectionIdAttr] {
for idx := range events[ConnectionIdAttr] {
var (
queryID = events[queryIdAttr][idx]
queryID = events[QueryIdAttr][idx]
)

// Delete the query from the active queries list.
Expand Down
84 changes: 81 additions & 3 deletions internal/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package subscriber_test

import (
"context"
"fmt"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/neutron-org/neutron-query-relayer/internal/registry"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client/query"
Expand Down Expand Up @@ -36,7 +38,6 @@ func TestDoneShouldEndSubscribe(t *testing.T) {
rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any())
rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any())

//registeredQueries := make(map[string]*neutrontypes.RegisteredQuery)
restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{
Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{
Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{
Expand Down Expand Up @@ -67,5 +68,82 @@ func TestDoneShouldEndSubscribe(t *testing.T) {
assert.Equal(t, err, nil)
}

// Test that subscribe does not end on
// processUpdateEvent -> #s.getNeutronRegisteredQuery(ctx, queryID) returning error
func TestSubscribeContinuesOnMissingQuery(t *testing.T) {
// Create a new controller
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cfgLogger := zap.NewProductionConfig()
logger, err := cfgLogger.Build()
require.NoError(t, err)

rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl)
restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl)

updateEvents := make(chan ctypes.ResultEvent)

rpcClient.EXPECT().Start()
rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(updateEvents, nil)
rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any())
rpcClient.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any())

rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any())
rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any())
rpcClient.EXPECT().Unsubscribe(gomock.Any(), gomock.Any(), gomock.Any())

ctx, cancel := context.WithCancel(context.Background())

// expect to return not found error on queryId = "1"
queryId := "1"
restQuery.EXPECT().NeutronInterchainQueriesRegisteredQuery(&query.NeutronInterchainQueriesRegisteredQueryParams{
QueryID: &queryId,
Context: ctx,
}).Return(nil, fmt.Errorf("not found"))

restQuery.EXPECT().NeutronInterchainQueriesRegisteredQueries(gomock.Any()).Return(&query.NeutronInterchainQueriesRegisteredQueriesOK{
Payload: &query.NeutronInterchainQueriesRegisteredQueriesOKBody{
Pagination: &query.NeutronInterchainQueriesRegisteredQueriesOKBodyPagination{
NextKey: nil,
Total: "",
},
RegisteredQueries: []*query.NeutronInterchainQueriesRegisteredQueriesOKBodyRegisteredQueriesItems0{},
},
}, nil)

queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100)
cfg := subscriber.Config{
ConnectionID: "",
WatchedTypes: nil,
Registry: registry.New(&registry.RegistryConfig{Addresses: []string{"owner"}}),
}
s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger)
assert.NoError(t, err)

go func() {
// send the update event that we'll return non-existent query
events := make(map[string][]string)
// to pass the checkEvents func
events[subscriber.ConnectionIdAttr] = []string{"kek"}
events[subscriber.KvKeyAttr] = []string{"kek"}
events[subscriber.TransactionsFilterAttr] = []string{"kek"}
events[subscriber.QueryIdAttr] = []string{"1"}
events[subscriber.TypeAttr] = []string{"kek"}

// to pass owner check
events[subscriber.OwnerAttr] = []string{"owner"}

updateEvents <- ctypes.ResultEvent{
Query: "",
Data: nil,
Events: events,
}

// should terminate Subscribe() function without error
cancel()
}()

// as we have `expect` for `NeutronInterchainQueriesRegisteredQueries`,
// we are sure that processUpdateEvent was executed before the context `cancel()` call
err = s.Subscribe(ctx, queriesTasksQueue)
assert.Equal(t, err, nil)
}
14 changes: 7 additions & 7 deletions internal/subscriber/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[strin
func (s *Subscriber) checkEvents(event tmtypes.ResultEvent) (bool, error) {
events := event.Events

icqEventsCount := len(events[connectionIdAttr])
icqEventsCount := len(events[ConnectionIdAttr])
if icqEventsCount == 0 {
s.logger.Debug("no connection id attributes received", zap.Any("events", events))
return false, nil
}

if len(events[kvKeyAttr]) != icqEventsCount ||
len(events[transactionsFilterAttr]) != icqEventsCount ||
len(events[queryIdAttr]) != icqEventsCount ||
len(events[typeAttr]) != icqEventsCount {
if len(events[KvKeyAttr]) != icqEventsCount ||
len(events[TransactionsFilterAttr]) != icqEventsCount ||
len(events[QueryIdAttr]) != icqEventsCount ||
len(events[TypeAttr]) != icqEventsCount {
return false, fmt.Errorf("events attributes length does not match for events=%v", events)
}

Expand All @@ -140,7 +140,7 @@ func (s *Subscriber) subscriberName() string {
// getQueryUpdatedSubscription returns a Query to filter out interchain "query_updated" events.
func (s *Subscriber) getQueryUpdatedSubscription() string {
return fmt.Sprintf("%s='%s' AND %s='%s' AND %s='%s'",
connectionIdAttr, s.connectionID,
ConnectionIdAttr, s.connectionID,
moduleAttr, neutrontypes.ModuleName,
actionAttr, neutrontypes.AttributeValueQueryUpdated,
)
Expand All @@ -149,7 +149,7 @@ func (s *Subscriber) getQueryUpdatedSubscription() string {
// getQueryRemovedSubscription returns a Query to filter out interchain "query_removed" events.
func (s *Subscriber) getQueryRemovedSubscription() string {
return fmt.Sprintf("%s='%s' AND %s='%s' AND %s='%s'",
connectionIdAttr, s.connectionID,
ConnectionIdAttr, s.connectionID,
moduleAttr, neutrontypes.ModuleName,
actionAttr, neutrontypes.AttributeValueQueryRemoved,
)
Expand Down

0 comments on commit 7197268

Please sign in to comment.