Skip to content

Commit

Permalink
add gomock and mock subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
NeverHappened committed Mar 14, 2024
1 parent a7b7209 commit 97d6f45
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 74 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ generate-openapi:

install:
go install -ldflags '$(ldflags)' -a ./cmd/neutron_query_relayer

mocks:
@echo "Regenerate mocks..."
go generate ./...
3 changes: 2 additions & 1 deletion cmd/neutron_query_relayer/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
relaysubscriber "github.com/neutron-org/neutron-query-relayer/internal/subscriber"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -92,7 +93,7 @@ func startRelayer() {
submittedTxsTasksQueue = make(chan relay.PendingSubmittedTxInfo)
)

subscriber, err := app.NewDefaultSubscriber(cfg, logRegistry)
subscriber, err := relaysubscriber.NewDefaultSubscriber(cfg, logRegistry)
if err != nil {
logger.Fatal("Failed to get NewDefaultSubscriber", zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ require (
github.com/go-openapi/strfmt v0.21.3
github.com/go-openapi/swag v0.22.3
github.com/go-openapi/validate v0.21.0
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/neutron-org/neutron v1.0.5-0.20231128122544-e605ed3db438
github.com/neutron-org/neutron-logger v0.0.0-20221027125151-535167f2dd73
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d
go.uber.org/mock v0.2.0
go.uber.org/zap v1.24.0
)

Expand Down Expand Up @@ -112,7 +115,6 @@ require (
github.com/gogo/protobuf v1.3.3 // indirect
github.com/golang/glog v1.1.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
Expand Down Expand Up @@ -186,7 +188,6 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/tendermint/go-amino v0.16.0 // indirect
github.com/tidwall/btree v1.6.0 // indirect
Expand Down
28 changes: 0 additions & 28 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ import (
nlogger "github.com/neutron-org/neutron-logger"
"github.com/neutron-org/neutron-query-relayer/internal/config"
"github.com/neutron-org/neutron-query-relayer/internal/raw"
"github.com/neutron-org/neutron-query-relayer/internal/registry"
"github.com/neutron-org/neutron-query-relayer/internal/relay"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber"
relaysubscriber "github.com/neutron-org/neutron-query-relayer/internal/subscriber"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client/query"
"github.com/neutron-org/neutron-query-relayer/internal/txsubmitchecker"
neutrontypes "github.com/neutron-org/neutron/x/interchainqueries/types"
)

var (
Expand Down Expand Up @@ -56,30 +52,6 @@ var (
rtyErr = retry.LastErrorOnly(true)
)

func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlogger.Registry) (relay.Subscriber, error) {
watchedMsgTypes := []neutrontypes.InterchainQueryType{neutrontypes.InterchainQueryTypeKV}
if cfg.AllowTxQueries {
watchedMsgTypes = append(watchedMsgTypes, neutrontypes.InterchainQueryTypeTX)
}

sub, err := relaysubscriber.NewSubscriber(
&subscriber.SubscriberConfig{
RPCAddress: cfg.NeutronChain.RPCAddr,
RESTAddress: cfg.NeutronChain.RESTAddr,
Timeout: cfg.NeutronChain.Timeout,
ConnectionID: cfg.NeutronChain.ConnectionID,
WatchedTypes: watchedMsgTypes,
Registry: registry.New(cfg.Registry),
},
logRegistry.Get(SubscriberContext),
)
if err != nil {
return nil, fmt.Errorf("failed to create a NewSubscriber: %s", err)
}

return sub, nil
}

func NewDefaultTxSubmitChecker(cfg config.NeutronQueryRelayerConfig, logRegistry *nlogger.Registry,
storage relay.Storage) (relay.TxSubmitChecker, error) {
neutronClient, err := raw.NewRPCClient(cfg.NeutronChain.RPCAddr, cfg.NeutronChain.Timeout)
Expand Down
19 changes: 19 additions & 0 deletions internal/subscriber/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package subscriber

import (
"context"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client/query"
)

type RpcHttpClient interface {
Start() error
Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
Status(ctx context.Context) (*ctypes.ResultStatus, error)
Unsubscribe(ctx context.Context, subscriber, query string) error
}

type RestHttpQuery interface {
NeutronInterchainQueriesRegisteredQuery(params *query.NeutronInterchainQueriesRegisteredQueryParams, opts ...query.ClientOption) (*query.NeutronInterchainQueriesRegisteredQueryOK, error)
NeutronInterchainQueriesRegisteredQueries(params *query.NeutronInterchainQueriesRegisteredQueriesParams, opts ...query.ClientOption) (*query.NeutronInterchainQueriesRegisteredQueriesOK, error)
}
94 changes: 63 additions & 31 deletions internal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,36 @@ package subscriber
import (
"context"
"fmt"
nlogger "github.com/neutron-org/neutron-logger"
"github.com/neutron-org/neutron-query-relayer/internal/app"
"github.com/neutron-org/neutron-query-relayer/internal/config"
"github.com/neutron-org/neutron-query-relayer/internal/relay"
"sync"
"time"

"github.com/neutron-org/neutron-query-relayer/internal/registry"

instrumenters "github.com/neutron-org/neutron-query-relayer/internal/metrics"

"github.com/cometbft/cometbft/rpc/client/http"
tmtypes "github.com/cometbft/cometbft/rpc/core/types"
"go.uber.org/zap"

rg "github.com/neutron-org/neutron-query-relayer/internal/registry"
restclient "github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client"
neutrontypes "github.com/neutron-org/neutron/x/interchainqueries/types"
)

var (
unsubscribeTimeout = time.Second * 5
)

// SubscriberConfig contains configurable fields for the Subscriber.
type SubscriberConfig struct {
// RPCAddress represents the address for RPC calls to the chain.
RPCAddress string
// RESTAddress represents the address for REST calls to the chain.
RESTAddress string
// Timeout defines time limit for requests executed by the Subscriber.
Timeout time.Duration
// Config contains configurable fields for the Subscriber.
type Config struct {
//// RPCAddress represents the address for RPC calls to the chain.
//RPCAddress string
//// RESTAddress represents the address for REST calls to the chain.
//RESTAddress string
//// Timeout defines time limit for requests executed by the Subscriber.
//Timeout time.Duration
// ConnectionID is the Neutron's side connection ID used to filter out queries.
ConnectionID string
// WatchedTypes is the list of query types to be observed and handled.
Expand All @@ -38,24 +42,53 @@ type SubscriberConfig struct {
Registry *rg.Registry
}

// NewSubscriber creates a new Subscriber instance ready to subscribe to Neutron events.
func NewSubscriber(
cfg *SubscriberConfig,
logger *zap.Logger,
) (*Subscriber, error) {
func NewDefaultSubscriber(cfg config.NeutronQueryRelayerConfig, logRegistry *nlogger.Registry) (relay.Subscriber, error) {
watchedMsgTypes := []neutrontypes.InterchainQueryType{neutrontypes.InterchainQueryTypeKV}
if cfg.AllowTxQueries {
watchedMsgTypes = append(watchedMsgTypes, neutrontypes.InterchainQueryTypeTX)
}

// rpcClient is used to subscribe to Neutron events.
rpcClient, err := newRPCClient(cfg.RPCAddress, cfg.Timeout)
rpcClient, err := NewRPCClient(cfg.NeutronChain.RPCAddr, cfg.NeutronChain.Timeout)
if err != nil {
return nil, fmt.Errorf("could not create new tendermint rpcClient: %w", err)
}
if err = rpcClient.Start(); err != nil {
return nil, fmt.Errorf("could not start tendermint rpcClient: %w", err)
return nil, fmt.Errorf("could not create new tendermint rpcClient for Subscriber: %w", err)
}

// restClient is used to retrieve registered queries from Neutron.
restClient, err := newRESTClient(cfg.RESTAddress, cfg.Timeout)
restClient, err := NewRESTClient(cfg.NeutronChain.RESTAddr, cfg.NeutronChain.Timeout)
if err != nil {
return nil, fmt.Errorf("failed to get newRESTClient: %w", err)
return nil, fmt.Errorf("failed to get NewRESTClient for Subscriber: %w", err)
}

sub, err := NewSubscriber(
&Config{
//RPCAddress: cfg.NeutronChain.RPCAddr,
//RESTAddress: cfg.NeutronChain.RESTAddr,
//Timeout: cfg.NeutronChain.Timeout,
ConnectionID: cfg.NeutronChain.ConnectionID,
WatchedTypes: watchedMsgTypes,
Registry: registry.New(cfg.Registry),
},
rpcClient,
restClient.Query,
logRegistry.Get(app.SubscriberContext),
)
if err != nil {
return nil, fmt.Errorf("failed to create a NewSubscriber: %s", err)
}

return sub, nil
}

// NewSubscriber creates a new Subscriber instance ready to subscribe to Neutron events.
func NewSubscriber(
cfg *Config,
rpcClient RpcHttpClient,
restClient RestHttpQuery,
logger *zap.Logger,
) (*Subscriber, error) {
if err := rpcClient.Start(); err != nil {
return nil, fmt.Errorf("could not start tendermint rpcClient: %w", err)
}

// Contains the types of queries that we are ready to serve (KV / TX).
Expand All @@ -65,8 +98,8 @@ func NewSubscriber(
}

return &Subscriber{
rpcClient: rpcClient,
restClient: restClient,
rpcClient: rpcClient,
restClientQuery: restClient,

connectionID: cfg.ConnectionID,
registry: cfg.Registry,
Expand All @@ -81,13 +114,12 @@ func NewSubscriber(
// filters them in accordance with the Registry configuration and watchedTypes, and provides a
// stream of split to KV and TX messages.
type Subscriber struct {
rpcClient *http.HTTP // Used to subscribe to events
restClient *restclient.HTTPAPIConsole // Used to run Neutron-specific queries using the REST

connectionID string
registry *rg.Registry
logger *zap.Logger
watchedTypes map[neutrontypes.InterchainQueryType]struct{}
rpcClient RpcHttpClient // Used to subscribe to events
restClientQuery RestHttpQuery // Used to run Neutron-specific queries using the REST
connectionID string
registry *rg.Registry
logger *zap.Logger
watchedTypes map[neutrontypes.InterchainQueryType]struct{}

activeQueries map[string]*neutrontypes.RegisteredQuery
}
Expand Down
19 changes: 14 additions & 5 deletions internal/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,38 @@ package subscriber_test
import (
"context"
"github.com/neutron-org/neutron-query-relayer/internal/subscriber"
mock_subscriber "github.com/neutron-org/neutron-query-relayer/testutil/mocks/subscriber"
neutrontypes "github.com/neutron-org/neutron/x/interchainqueries/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"testing"
)

func TestDoneShouldEndSubscribe(t *testing.T) {
// Create a new controller
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cfgLogger := zap.NewProductionConfig()
logger, err := cfgLogger.Build()
require.NoError(t, err)

rpcClient := mock_subscriber.NewMockRpcHttpClient(ctrl)
restQuery := mock_subscriber.NewMockRestHttpQuery(ctrl)

queriesTasksQueue := make(chan neutrontypes.RegisteredQuery, 100)
cfg := subscriber.SubscriberConfig{
RPCAddress: "",
RESTAddress: "",
Timeout: 0,
cfg := subscriber.Config{
//RPCAddress: "",
//RESTAddress: "",
//Timeout: 0,
ConnectionID: "",
WatchedTypes: nil,
Registry: nil,
}
s, err := subscriber.NewSubscriber(&cfg, logger)
s, err := subscriber.NewSubscriber(&cfg, rpcClient, restQuery, logger)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
15 changes: 8 additions & 7 deletions internal/subscriber/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ var (
rpcWSEndpoint = "/websocket"
)

// newRPCClient creates a new tendermint RPC client with timeout.
func newRPCClient(rpcAddr string, timeout time.Duration) (*tmhttp.HTTP, error) {
// NewRPCClient creates a new tendermint RPC client with timeout.
func NewRPCClient(rpcAddr string, timeout time.Duration) (RpcHttpClient, error) {
httpClient, err := jsonrpcclient.DefaultHTTPClient(rpcAddr)
if err != nil {
return nil, err
}
httpClient.Timeout = timeout
return tmhttp.NewWithClient(rpcAddr, rpcWSEndpoint, httpClient)
client, err := tmhttp.NewWithClient(rpcAddr, rpcWSEndpoint, httpClient)
return *client, err
}

// newRESTClient makes sure that the restAddr is formed correctly and returns a REST query.
func newRESTClient(restAddr string, timeout time.Duration) (*restclient.HTTPAPIConsole, error) {
// NewRESTClient makes sure that the restAddr is formed correctly and returns a REST query.
func NewRESTClient(restAddr string, timeout time.Duration) (*restclient.HTTPAPIConsole, error) {
url, err := url.Parse(restAddr)
if err != nil {
return nil, fmt.Errorf("failed to parse restAddr: %w", err)
Expand All @@ -51,7 +52,7 @@ func newRESTClient(restAddr string, timeout time.Duration) (*restclient.HTTPAPIC

// getNeutronRegisteredQuery retrieves a registered query from Neutron.
func (s *Subscriber) getNeutronRegisteredQuery(ctx context.Context, queryId string) (*neutrontypes.RegisteredQuery, error) {
res, err := s.restClient.Query.NeutronInterchainQueriesRegisteredQuery(
res, err := s.restClientQuery.NeutronInterchainQueriesRegisteredQuery(
&query.NeutronInterchainQueriesRegisteredQueryParams{
QueryID: &queryId,
Context: ctx,
Expand All @@ -72,7 +73,7 @@ func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[strin
var out = map[string]*neutrontypes.RegisteredQuery{}
var pageKey *strfmt.Base64
for {
res, err := s.restClient.Query.NeutronInterchainQueriesRegisteredQueries(
res, err := s.restClientQuery.NeutronInterchainQueriesRegisteredQueries(
&query.NeutronInterchainQueriesRegisteredQueriesParams{
Owners: s.registry.GetAddresses(),
ConnectionID: &s.connectionID,
Expand Down
3 changes: 3 additions & 0 deletions testutil/mocks/gomock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package mocks

//go:generate mockgen -source=./../../internal/subscriber/clients.go -destination ./subscriber/expected_clients.go
Loading

0 comments on commit 97d6f45

Please sign in to comment.