diff --git a/CHANGELOG.md b/CHANGELOG.md index f9dde010d7..a252b1b7da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Changelog for NeoFS Node ### Removed - Non-notary mode support for sidechain (#2321) +- Priority switching b/w RPC endpoints in the morph client (#2306) ### Updated - Update minimal supported Go version up to v1.18 (#2340) @@ -31,6 +32,9 @@ Changelog for NeoFS Node - Storage and Inner-ring nodes exposes their version via the `neofs_[node|ir]_version` metric now. - In the local consensus mode (IR) it is allowed to provide additional TLS setup addresses now, see `morph.consensus.rpc.tls` section. +- `morph.switch_interval` IR and SN config value is not used anymore. +- `morph.rpc_endpoint` SN config value and `morph.endpoint.client` IR config value has been deprecated and will be + removed with the next minor release. Use `morph.endpoints` for both instead (NOTE: it does not have priorities now). ## [0.36.1] - 2023-04-26 diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 358f00b26c..da74f61af2 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -49,11 +49,8 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("morph.dial_timeout", 15*time.Second) cfg.SetDefault("morph.validators", []string{}) - cfg.SetDefault("morph.switch_interval", 2*time.Minute) - cfg.SetDefault("mainnet.endpoint.client", []string{}) cfg.SetDefault("mainnet.dial_timeout", 15*time.Second) - cfg.SetDefault("mainnet.switch_interval", 2*time.Minute) cfg.SetDefault("wallet.path", "") // inner ring node NEP-6 wallet cfg.SetDefault("wallet.address", "") // account address diff --git a/cmd/neofs-node/config/morph/config.go b/cmd/neofs-node/config/morph/config.go index 782b941e2b..f81adbd735 100644 --- a/cmd/neofs-node/config/morph/config.go +++ b/cmd/neofs-node/config/morph/config.go @@ -6,7 +6,6 @@ import ( "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" - "github.com/nspcc-dev/neofs-node/pkg/morph/client" ) const ( @@ -16,23 +15,17 @@ const ( // DialTimeoutDefault is a default dial timeout of morph chain client connection. DialTimeoutDefault = 5 * time.Second - // PriorityDefault is a default endpoint priority for the morph client. - PriorityDefault = 1 - // CacheTTLDefault is a default value for cached values TTL. // It is 0, because actual default depends on block time. CacheTTLDefault = time.Duration(0) - - // SwitchIntervalDefault is a default Neo RPCs switch interval. - SwitchIntervalDefault = 2 * time.Minute ) -// RPCEndpoint returns list of the values of "rpc_endpoint" config parameter +// Endpoints returns list of the values of "endpoints" config parameter // from "morph" section. // // Throws panic if list is empty. -func RPCEndpoint(c *config.Config) []client.Endpoint { - var es []client.Endpoint +func Endpoints(c *config.Config) []string { + var endpointsDeprecated []string sub := c.Sub(subsection).Sub("rpc_endpoint") for i := 0; ; i++ { @@ -42,21 +35,16 @@ func RPCEndpoint(c *config.Config) []client.Endpoint { break } - priority := int(config.IntSafe(s, "priority")) - if priority <= 0 { - priority = PriorityDefault - } - - es = append(es, client.Endpoint{ - Address: addr, - Priority: priority, - }) + endpointsDeprecated = append(endpointsDeprecated, addr) } - if len(es) == 0 { + endpoints := config.StringSliceSafe(c.Sub(subsection), "endpoints") + endpoints = append(endpoints, endpointsDeprecated...) + + if len(endpoints) == 0 { panic(fmt.Errorf("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")) } - return es + return endpoints } // DialTimeout returns the value of "dial_timeout" config parameter @@ -84,16 +72,3 @@ func CacheTTL(c *config.Config) time.Duration { return CacheTTLDefault } - -// SwitchInterval returns the value of "switch_interval" config parameter -// from "morph" section. -// -// Returns SwitchIntervalDefault if value is not positive duration. -func SwitchInterval(c *config.Config) time.Duration { - res := config.DurationSafe(c.Sub(subsection), "switch_interval") - if res != 0 { - return res - } - - return SwitchIntervalDefault -} diff --git a/cmd/neofs-node/config/morph/config_test.go b/cmd/neofs-node/config/morph/config_test.go index 78f528e806..855e65bcdb 100644 --- a/cmd/neofs-node/config/morph/config_test.go +++ b/cmd/neofs-node/config/morph/config_test.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" morphconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/morph" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" - "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/stretchr/testify/require" ) @@ -15,26 +14,19 @@ func TestMorphSection(t *testing.T) { t.Run("defaults", func(t *testing.T) { empty := configtest.EmptyConfig() - require.Panics(t, func() { morphconfig.RPCEndpoint(empty) }) + require.Panics(t, func() { morphconfig.Endpoints(empty) }) require.Equal(t, morphconfig.DialTimeoutDefault, morphconfig.DialTimeout(empty)) require.Equal(t, morphconfig.CacheTTLDefault, morphconfig.CacheTTL(empty)) - require.Equal(t, morphconfig.SwitchIntervalDefault, morphconfig.SwitchInterval(empty)) }) const path = "../../../../config/example/node" - var ( - rpcs = []client.Endpoint{ - {"wss://rpc1.morph.fs.neo.org:40341/ws", 1}, - {"wss://rpc2.morph.fs.neo.org:40341/ws", 2}, - } - ) + rpcs := []string{"wss://rpc1.morph.fs.neo.org:40341/ws", "wss://rpc2.morph.fs.neo.org:40341/ws"} var fileConfigTest = func(c *config.Config) { - require.Equal(t, rpcs, morphconfig.RPCEndpoint(c)) + require.Equal(t, rpcs, morphconfig.Endpoints(c)) require.Equal(t, 30*time.Second, morphconfig.DialTimeout(c)) require.Equal(t, 15*time.Second, morphconfig.CacheTTL(c)) - require.Equal(t, 3*time.Minute, morphconfig.SwitchInterval(c)) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index 787f0e4c7b..e5d2a0170e 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -16,7 +16,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" - "github.com/nspcc-dev/neofs-node/pkg/util/rand" "go.uber.org/zap" ) @@ -30,22 +29,15 @@ const ( func initMorphComponents(c *cfg) { var err error - addresses := morphconfig.RPCEndpoint(c.appCfg) - - // Morph client stable-sorts endpoints by priority. Shuffle here to randomize - // order of endpoints with the same priority. - rand.Shuffle(len(addresses), func(i, j int) { - addresses[i], addresses[j] = addresses[j], addresses[i] - }) + addresses := morphconfig.Endpoints(c.appCfg) cli, err := client.New(c.key, client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)), client.WithLogger(c.log), - client.WithEndpoints(addresses...), + client.WithEndpoints(addresses), client.WithConnLostCallback(func() { c.internalErr <- errors.New("morph connection has been lost") }), - client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)), ) if err != nil { c.log.Info("failed to create neo RPC client", diff --git a/config/example/ir.env b/config/example/ir.env index 80ac3d0eb0..7d8c4af4af 100644 --- a/config/example/ir.env +++ b/config/example/ir.env @@ -7,15 +7,12 @@ NEOFS_IR_WALLET_PASSWORD=secret NEOFS_IR_WITHOUT_MAINNET=false NEOFS_IR_MORPH_DIAL_TIMEOUT=5s -NEOFS_IR_MORPH_ENDPOINT_CLIENT_0_ADDRESS="wss://sidechain1.fs.neo.org:30333/ws" -NEOFS_IR_MORPH_ENDPOINT_CLIENT_1_ADDRESS="wss://sidechain2.fs.neo.org:30333/ws" +NEOFS_IR_MORPH_ENDPOINTS="wss://sidechain1.fs.neo.org:30333/ws wss://sidechain2.fs.neo.org:30333/ws" NEOFS_IR_MORPH_VALIDATORS="0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170" -NEOFS_IR_MORPH_SWITCH_INTERVAL=2m NEOFS_IR_MAINNET_DIAL_TIMEOUT=5s NEOFS_IR_MAINNET_ENDPOINT_CLIENT_0_ADDRESS="wss://mainchain1.fs.neo.org:30333/ws" NEOFS_IR_MAINNET_ENDPOINT_CLIENT_1_ADDRESS="wss://mainchain2.fs.neo.org:30333/ws" -NEOFS_IR_MAINNET_SWITCH_INTERVAL=2m NEOFS_IR_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6" NEOFS_IR_CONTROL_GRPC_ENDPOINT=localhost:8090 diff --git a/config/example/ir.yaml b/config/example/ir.yaml index 18cee8dad2..ba2938e04c 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -12,13 +12,11 @@ without_mainnet: false # Run application in single chain environment without mai morph: dial_timeout: 5s # Timeout for RPC client connection to sidechain - endpoint: - client: # List of websocket RPC endpoints in sidechain - - address: wss://sidechain1.fs.neo.org:30333/ws - - address: wss://sidechain2.fs.neo.org:30333/ws + endpoints: # List of websocket RPC endpoints in sidechain + - wss://sidechain1.fs.neo.org:30333/ws + - wss://sidechain2.fs.neo.org:30333/ws validators: # List of hex-encoded 33-byte public keys of sidechain validators to vote for at application startup - 0283120f4c8c1fc1d792af5063d2def9da5fddc90bc1384de7fcfdda33c3860170 - switch_interval: 2m # interval b/w RPC switch attempts if the node is not connected to the highest priority node consensus: # Local consensus launch mode activated only when 'endpoint.client' is unset. magic: 15405 # Network magic. Must be unsigned integer in range [1:4294967295] committee: # Initial committee diff --git a/config/example/node.env b/config/example/node.env index f2eace33e3..c41127abbf 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -64,11 +64,7 @@ NEOFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f # Morph chain section NEOFS_MORPH_DIAL_TIMEOUT=30s NEOFS_MORPH_CACHE_TTL=15s -NEOFS_MORPH_SWITCH_INTERVAL=3m -NEOFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.fs.neo.org:40341/ws" -NEOFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0 -NEOFS_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.fs.neo.org:40341/ws" -NEOFS_MORPH_RPC_ENDPOINT_1_PRIORITY=2 +NEOFS_MORPH_ENDPOINTS="wss://rpc1.morph.fs.neo.org:40341/ws wss://rpc2.morph.fs.neo.org:40341/ws" # API Client section NEOFS_APICLIENT_DIAL_TIMEOUT=15s diff --git a/config/example/node.json b/config/example/node.json index 87022ca064..04d7547f6f 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -102,16 +102,9 @@ "morph": { "dial_timeout": "30s", "cache_ttl": "15s", - "switch_interval": "3m", - "rpc_endpoint": [ - { - "address": "wss://rpc1.morph.fs.neo.org:40341/ws", - "priority": 0 - }, - { - "address": "wss://rpc2.morph.fs.neo.org:40341/ws", - "priority": 2 - } + "endpoints": [ + "wss://rpc1.morph.fs.neo.org:40341/ws", + "wss://rpc2.morph.fs.neo.org:40341/ws" ] }, "apiclient": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 291bf1c171..56efbc78d4 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -86,12 +86,9 @@ morph: cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching. # Default value: block time. It is recommended to have this value less or equal to block time. # Cached entities: containers, container lists, eACL tables. - switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node - rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success - - address: wss://rpc1.morph.fs.neo.org:40341/ws - priority: 0 - - address: wss://rpc2.morph.fs.neo.org:40341/ws - priority: 2 + endpoints: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success + - wss://rpc1.morph.fs.neo.org:40341/ws + - wss://rpc2.morph.fs.neo.org:40341/ws apiclient: dial_timeout: 15s # timeout for NEOFS API client connection diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 4d068f9d83..36e3e2eba2 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -134,24 +134,14 @@ morph: cache_ttl: 15s rpc_endpoint: - address: wss://rpc1.morph.fs.neo.org:40341/ws - priority: 1 - address: wss://rpc2.morph.fs.neo.org:40341/ws - priority: 2 - switch_interval: 2m ``` -| Parameter | Type | Default value | Description | -|-------------------|-----------------------------------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. | -| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).
Negative value disables caching.
Cached entities: containers, container lists, eACL tables. | -| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. | -| `switch_interval` | `duration` | `2m` | Time interval between the attempts to connect to the highest priority RPC node if the connection is not established yet. | - -## `rpc_endpoint` subsection -| Parameter | Type | Default value | Description | -|------------|----------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `address` | `string` | | _WebSocket_ N3 endpoint. | -| `priority` | `int` | `1` | Priority of an endpoint. Endpoint with a higher priority (lower configuration value) has more chance of being used. Endpoints with equal priority are iterated over randomly; a negative priority is interpreted as `1`. | +| Parameter | Type | Default value | Description | +|----------------|------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. | +| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).
Negative value disables caching.
Cached entities: containers, container lists, eACL tables. | +| `endpoints` | `[]string` | | Ordered array of _webSocket_ N3 endpoint. Only one is connected at a time, the others are for a fallback if any network error appears. | # `storage` section diff --git a/pkg/innerring/config.go b/pkg/innerring/config.go index 21a7851b1c..0315216ab3 100644 --- a/pkg/innerring/config.go +++ b/pkg/innerring/config.go @@ -22,9 +22,12 @@ import ( // checks if Inner Ring app is configured to be launched in local consensus // mode. func isLocalConsensusMode(cfg *viper.Viper) bool { - const morphRPCSection = "morph.endpoint.client" + const morphRPCSectionDeprecated = "morph.endpoint.client" // first expression required for ENVs in which nesting breaks - return !cfg.IsSet(morphRPCSection+".0.address") && !cfg.IsSet(morphRPCSection) + deprecatedNotSet := !cfg.IsSet(morphRPCSectionDeprecated+".0.address") && !cfg.IsSet(morphRPCSectionDeprecated) + actualNotSet := !cfg.IsSet("morph.endpoints") + + return deprecatedNotSet && actualNotSet } func parseBlockchainConfig(v *viper.Viper, _logger *logger.Logger) (c blockchain.Config, err error) { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 93d7596049..5b1bb32ef4 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -1010,12 +1010,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev } func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { - // config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients" - var endpoints []client.Endpoint - - // defaultPriority is a default endpoint priority - const defaultPriority = 1 + endpoints := p.cfg.GetStringSlice(p.name + ".endpoints") + // deprecated endpoints with priorities section := p.name + ".endpoint.client" for i := 0; ; i++ { addr := p.cfg.GetString(fmt.Sprintf("%s.%d.%s", section, i, "address")) @@ -1023,15 +1020,7 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c break } - priority := p.cfg.GetInt(section + ".priority") - if priority <= 0 { - priority = defaultPriority - } - - endpoints = append(endpoints, client.Endpoint{ - Address: addr, - Priority: priority, - }) + endpoints = append(endpoints, addr) } if len(endpoints) == 0 { @@ -1044,11 +1033,10 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c client.WithLogger(p.log), client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")), client.WithSigner(p.sgn), - client.WithEndpoints(endpoints...), + client.WithEndpoints(endpoints), client.WithConnLostCallback(func() { errChan <- fmt.Errorf("%s chain connection has been lost", p.name) }), - client.WithSwitchInterval(p.cfg.GetDuration(p.name+".switch_interval")), ) } diff --git a/pkg/morph/client/audit/result_test.go b/pkg/morph/client/audit/result_test.go index 006bfba774..5a41217dd6 100644 --- a/pkg/morph/client/audit/result_test.go +++ b/pkg/morph/client/audit/result_test.go @@ -26,7 +26,7 @@ func TestAuditResults(t *testing.T) { auditHash, err := util.Uint160DecodeStringLE(sAuditHash) require.NoError(t, err) - morphClient, err := client.New(key, client.WithEndpoints(client.Endpoint{Address: endpoint})) + morphClient, err := client.New(key, client.WithEndpoints([]string{endpoint})) require.NoError(t, err) auditClientWrapper, err := NewFromMorph(morphClient, auditHash, 0) diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 7fb4db8f82..00a3485f11 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -24,7 +24,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -59,7 +58,7 @@ type Client struct { cfg cfg - endpoints endpoints + endpoints []string // switchLock protects endpoints, inactive, and subscription-related fields. // It is taken exclusively during endpoint switch and locked in shared mode @@ -73,11 +72,6 @@ type Client struct { // establish connection to any of the // provided RPC endpoints inactive bool - - // indicates that Client has already started - // goroutine that tries to switch to the higher - // priority RPC node - switchIsActive atomic.Bool } type cache struct { diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index a77af89a40..14c56fa795 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -37,13 +37,11 @@ type cfg struct { signer *transaction.Signer - endpoints []Endpoint + endpoints []string singleCli *rpcclient.WSClient // neo-go client for single client mode inactiveModeCb Callback - - switchInterval time.Duration } const ( @@ -127,9 +125,7 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { return nil, errors.New("no endpoints were provided") } - cli.endpoints.init(cfg.endpoints) - - cli.client, act, err = cli.newCli(cli.endpoints.list[0].Address) + cli.client, act, err = cli.newCli(cli.endpoints[0]) if err != nil { return nil, fmt.Errorf("could not create RPC client: %w", err) } @@ -247,10 +243,10 @@ func WithSigner(signer *transaction.Signer) Option { } // WithEndpoints returns a client constructor option -// that specifies additional Neo rpc endpoints. +// that specifies Neo rpc endpoints. // // Has no effect if WithSingleClient is provided. -func WithEndpoints(endpoints ...Endpoint) Option { +func WithEndpoints(endpoints []string) Option { return func(c *cfg) { c.endpoints = append(c.endpoints, endpoints...) } @@ -276,12 +272,3 @@ func WithConnLostCallback(cb Callback) Option { c.inactiveModeCb = cb } } - -// WithSwitchInterval returns a client constructor option -// that specifies a wait interval b/w attempts to reconnect -// to an RPC node with the highest priority. -func WithSwitchInterval(i time.Duration) Option { - return func(c *cfg) { - c.switchInterval = i - } -} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 6b997c3f52..1c5b1e2860 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -1,9 +1,6 @@ package client import ( - "sort" - "time" - "go.uber.org/zap" ) @@ -13,20 +10,6 @@ type Endpoint struct { Priority int } -type endpoints struct { - curr int - list []Endpoint -} - -func (e *endpoints) init(ee []Endpoint) { - sort.SliceStable(ee, func(i, j int) bool { - return ee[i].Priority < ee[j].Priority - }) - - e.curr = 0 - e.list = ee -} - // SwitchRPC performs reconnection and returns true if it was successful. func (c *Client) SwitchRPC() bool { c.switchLock.Lock() @@ -35,12 +18,11 @@ func (c *Client) SwitchRPC() bool { c.client.Close() // Iterate endpoints in the order of decreasing priority. - for c.endpoints.curr = range c.endpoints.list { - newEndpoint := c.endpoints.list[c.endpoints.curr].Address - cli, act, err := c.newCli(newEndpoint) + for _, e := range c.endpoints { + cli, act, err := c.newCli(e) if err != nil { c.logger.Warn("could not establish connection to the switched RPC node", - zap.String("endpoint", newEndpoint), + zap.String("endpoint", e), zap.Error(err), ) @@ -50,17 +32,11 @@ func (c *Client) SwitchRPC() bool { c.cache.invalidate() c.logger.Info("connection to the new RPC node has been established", - zap.String("endpoint", newEndpoint)) + zap.String("endpoint", e)) c.client = cli c.setActor(act) - if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && - c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { - c.switchIsActive.Store(true) - go c.switchToMostPrioritized() - } - return true } @@ -81,75 +57,6 @@ func (c *Client) closeWaiter() { c.close() } -func (c *Client) switchToMostPrioritized() { - t := time.NewTicker(c.cfg.switchInterval) - defer t.Stop() - defer c.switchIsActive.Store(false) - -mainLoop: - for { - select { - case <-c.cfg.ctx.Done(): - return - case <-t.C: - c.switchLock.RLock() - endpointsCopy := make([]Endpoint, len(c.endpoints.list)) - copy(endpointsCopy, c.endpoints.list) - - currPriority := c.endpoints.list[c.endpoints.curr].Priority - highestPriority := c.endpoints.list[0].Priority - c.switchLock.RUnlock() - - if currPriority == highestPriority { - // already connected to - // the most prioritized - return - } - - for i, e := range endpointsCopy { - if currPriority == e.Priority { - // a switch will not increase the priority - continue mainLoop - } - - tryE := e.Address - - cli, act, err := c.newCli(tryE) - if err != nil { - c.logger.Warn("could not create client to the higher priority node", - zap.String("endpoint", tryE), - zap.Error(err), - ) - continue - } - - c.switchLock.Lock() - - // higher priority node could have been - // connected in the other goroutine - if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { - cli.Close() - c.switchLock.Unlock() - return - } - - c.client.Close() - c.cache.invalidate() - c.client = cli - c.setActor(act) - c.endpoints.curr = i - - c.switchLock.Unlock() - - c.logger.Info("switched to the higher priority RPC", - zap.String("endpoint", tryE)) - - return - } - } - } -} - // close closes notification channel and wrapped WS client. func (c *Client) close() { c.client.Close() diff --git a/pkg/morph/client/multy_test.go b/pkg/morph/client/multy_test.go deleted file mode 100644 index 4bc38c70ca..0000000000 --- a/pkg/morph/client/multy_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package client - -import ( - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestInitEndpoints(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - - ee := make([]Endpoint, 100) - for i := range ee { - ee[i].Priority = rand.Int() - } - - var eeInternal endpoints - eeInternal.init(ee) - - prevValue := eeInternal.list[0].Priority - - for _, e := range eeInternal.list { - require.True(t, prevValue <= e.Priority) - - prevValue = e.Priority - } -}