diff --git a/.golangci.yml b/.golangci.yml index 9d935da1c..4b7617b0d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -197,6 +197,10 @@ issues: linters: - funlen text: "Function 'TestConnectServerShouldNotPanicOnRequest' is too long" + - path: pkg/networkservice/common/connect/server_test.go + linters: + - funlen + text: "Function 'TestConnectServer_RequestParallel' is too long" - path: pkg/networkservice/utils/checks/checkerror/server_test.go linters: - dupl @@ -288,3 +292,7 @@ issues: - path: pkg/networkservice/common/switchcase/.*_test.go linters: - dupl + - path: pkg/networkservice/chains/endpoint/combine_monitor_server.go + linters: + - interfacer + text: "`rawSrv` can be `github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor.EventConsumer`" diff --git a/pkg/networkservice/chains/client/README.md b/pkg/networkservice/chains/client/README.md deleted file mode 100644 index 5f53327ba..000000000 --- a/pkg/networkservice/chains/client/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# Functional requirements - -There are some common chain elements that we expect to have in every client chain to make NSM working. Instead of that, -there are few different scenarios when we need to create a client chain to initiate NSM request: -1. Client to NSMgr - simple case when there is an application requesting some L2/L3 connection from the NSMgr. - * no incoming L2/L3 request - client itself is a request generator - * complete chain - ``` - Client --Request--> NSMgr - | | - |---L2/L3 connection---| - | | - ``` -2. Server to endpoint client - we already have application running as a NSM endpoint receiving request to L2/L3 -connection, but it also needs to request some other L2/L3 connection from some other endpoint. - * there is an incoming L2/L3 request - we need to generate an outgoing L2/L3 request, but the connection we return - is an incoming connection - * part of some server chain - we need to add `clientConnection` and request next elements - ``` - ... Endpoint --Request--> Endpoint - | | | - |---L2/L3 connection---|---L2/L3 connection---| - | | | - ``` -3. Proxy to endpoint client - we already have application running as a NSM server, but it doesn't provide L2/L3 -connection, it simply passes the request to some other endpoint. - * there is an incoming L2/L3 request but we simply forward it - * part of some server chain - we need to add `clientConnection` and request next elements - ``` - ... Proxy --Request--> Endpoint - | | | - |---------------L2/L3 connection--------------| - | | | - ``` - -# Implementation - -## client.NewClient(..., grpcCC, ...additionalFunctionality) - -It is a solution for the (1.) case. Client appends `additionalFunctionality` to the default client chain and passes -incoming request to the NSMgr over the `grpcCC`. - -## client.NewCrossConnectClientFactory(..., ...additionalFunctionality) - -It is a solution for the (2.) case. We create a new GRPC client on each new client URL received from the incoming request. -It can be used in `connect.NewServer` so `clientConnection` will be processed correctly. - -## client.NewClientFactory(..., ...additionalFunctionality) - -It is a solution for the (3.) case. We create a new GRPC client on each new client URL received, but process like (1.). -It can be used in `connect.NewServer` so `clientConnection` will be processed correctly. \ No newline at end of file diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index 65f31f6f6..a5b85649b 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -1,6 +1,4 @@ -// Copyright (c) 2020-2021 Cisco Systems, Inc. -// -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,146 +19,53 @@ package client import ( "context" - "net/url" - "time" "github.com/google/uuid" - "google.golang.org/grpc" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" - "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/dial" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" ) -type clientOptions struct { - name string - additionalFunctionality []networkservice.NetworkServiceClient - authorizeClient networkservice.NetworkServiceClient - dialOptions []grpc.DialOption - dialTimeout time.Duration -} - -// Option modifies default client chain values. -type Option func(c *clientOptions) - -// WithName sets name for the client. -func WithName(name string) Option { - return Option(func(c *clientOptions) { - c.name = name - }) -} - -// WithAdditionalFunctionality sets additionalFunctionality for the client. Note: this adds into tail of the client chain. -func WithAdditionalFunctionality(additionalFunctionality ...networkservice.NetworkServiceClient) Option { - return Option(func(c *clientOptions) { - c.additionalFunctionality = additionalFunctionality - }) -} - -// WithAuthorizeClient sets authorizeClient for the client chain. -func WithAuthorizeClient(authorizeClient networkservice.NetworkServiceClient) Option { - if authorizeClient == nil { - panic("authorizeClient cannot be nil") - } - return Option(func(c *clientOptions) { - c.authorizeClient = authorizeClient - }) -} - -// WithDialOptions sets dial options -func WithDialOptions(dialOptions ...grpc.DialOption) Option { - return Option(func(c *clientOptions) { - c.dialOptions = dialOptions - }) -} - -// WithDialTimeout sets dial timeout -func WithDialTimeout(dialTimeout time.Duration) Option { - return func(c *clientOptions) { - c.dialTimeout = dialTimeout - } -} - -// NewClient - returns a (1.) case NSM client. +// NewClient - returns case NSM client. // - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. -// - cc - grpc.ClientConnInterface for the endpoint to which this client should connect -func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) networkservice.NetworkServiceClient { - rv := new(networkservice.NetworkServiceClient) +func NewClient(ctx context.Context, clientOpts ...Option) networkservice.NetworkServiceClient { var opts = &clientOptions{ name: "client-" + uuid.New().String(), authorizeClient: null.NewClient(), - dialTimeout: 100 * time.Millisecond, + refreshClient: refresh.NewClient(ctx), } for _, opt := range clientOpts { opt(opts) } - *rv = chain.NewNetworkServiceClient( - updatepath.NewClient(opts.name), - begin.NewClient(), - metadata.NewClient(), - refresh.NewClient(ctx), - adapters.NewServerToClient( - chain.NewNetworkServiceServer( - heal.NewServer(ctx, - heal.WithOnHeal(rv), - heal.WithOnRestore(heal.OnRestoreRestore), - heal.WithRestoreTimeout(time.Minute)), - clienturl.NewServer(connectTo), - connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return chain.NewNetworkServiceClient( - append( - opts.additionalFunctionality, - // TODO: move back to the end of the chain when `begin` chain element will be ready - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), - heal.WithEndpointChange()), - opts.authorizeClient, - networkservice.NewNetworkServiceClient(cc), - )..., - ) - }, - connect.WithDialOptions(opts.dialOptions...), - connect.WithDialTimeout(opts.dialTimeout)), - ), - ), - ) - return *rv -} - -// NewClientFactory - returns a (3.) case func(cc grpc.ClientConnInterface) NSM client factory. -func NewClientFactory(clientOpts ...Option) connect.ClientFactory { - return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - var rv networkservice.NetworkServiceClient - var opts = &clientOptions{ - name: "client-" + uuid.New().String(), - authorizeClient: null.NewClient(), - } - for _, opt := range clientOpts { - opt(opts) - } - rv = chain.NewNetworkServiceClient( + return chain.NewNetworkServiceClient( + append( + []networkservice.NetworkServiceClient{ + updatepath.NewClient(opts.name), + begin.NewClient(), + metadata.NewClient(), + opts.refreshClient, + clienturl.NewClient(opts.clientURL), + clientconn.NewClient(opts.cc), + }, append( - append([]networkservice.NetworkServiceClient{ - updatepath.NewClient(opts.name), - begin.NewClient(), - metadata.NewClient(), - refresh.NewClient(ctx), - // TODO: move back to the end of the chain when `begin` chain element will be ready - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), - }, opts.additionalFunctionality...), + opts.additionalFunctionality, + dial.NewClient(ctx, + dial.WithDialOptions(opts.dialOptions...), + dial.WithDialTimeout(opts.dialTimeout), + ), opts.authorizeClient, - networkservice.NewNetworkServiceClient(cc), - )...) - return rv - } + connect.NewClient(), + )..., + )..., + ) } diff --git a/pkg/networkservice/chains/client/client_heal_test.go b/pkg/networkservice/chains/client/client_heal_test.go deleted file mode 100644 index d117cfa52..000000000 --- a/pkg/networkservice/chains/client/client_heal_test.go +++ /dev/null @@ -1,247 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package client_test - -import ( - "context" - "net/url" - "sync/atomic" - "testing" - "time" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" - - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" -) - -func startServer(ctx context.Context, t *testing.T, serverURL *url.URL, opts ...endpoint.Option) networkservice.NetworkServiceServer { - nse := endpoint.NewServer(ctx, sandbox.GenerateTestToken, opts...) - - select { - case err := <-endpoint.Serve(ctx, serverURL, nse): - require.NoError(t, err) - default: - } - - return nse -} - -func TestClient_Heal(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - serverCtx, serverCancel := context.WithCancel(ctx) - defer serverCancel() - - serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} - startServer(serverCtx, t, serverURL) - - nsc := client.NewClient(ctx, - serverURL, - client.WithDialOptions(sandbox.DialOptions()...), - client.WithDialTimeout(time.Second), - ) - - _, err := nsc.Request(ctx, &networkservice.NetworkServiceRequest{}) - require.NoError(t, err) - - serverCancel() - require.Eventually(t, func() bool { - return sandbox.CheckURLFree(serverURL) - }, time.Second, time.Millisecond*10) - require.NoError(t, ctx.Err()) - - startServer(ctx, t, serverURL) - - require.Eventually(t, func() bool { - _, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{}) - return err == nil - }, time.Second*2, time.Millisecond*50) -} - -func TestClient_StopHealingOnFailure(t *testing.T) { - t.Skip("need `begin` chain element to fix heal client") - var samples = []struct { - name string - optsSupplier func(counter networkservice.NetworkServiceClient) []client.Option - }{ - { - name: "Authorize failure", - optsSupplier: func(counter networkservice.NetworkServiceClient) []client.Option { - return []client.Option{ - client.WithAuthorizeClient(new(refreshFailureClient)), - client.WithAdditionalFunctionality(counter), - } - }, - }, - { - name: "Additional functionality failure", - optsSupplier: func(counter networkservice.NetworkServiceClient) []client.Option { - return []client.Option{ - client.WithAdditionalFunctionality( - new(refreshFailureClient), - counter, - ), - } - }, - }, - } - - for _, sample := range samples { - // nolint:scopelint - t.Run(sample.name, func(t *testing.T) { - testStopHealingOnFailure(t, func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { - return client.NewClient(ctx, - serverURL, - append([]client.Option{ - client.WithDialOptions(sandbox.DialOptions()...), - client.WithDialTimeout(time.Second), - }, sample.optsSupplier(counter)...)..., - ) - }) - }) - } -} - -func TestClientFactory_StopHealingOnFailure(t *testing.T) { - t.Skip("need `begin` chain element to fix heal client") - var samples = []struct { - name string - opts []client.Option - }{ - { - name: "Authorize failure", - opts: []client.Option{ - client.WithAuthorizeClient(new(refreshFailureClient)), - }, - }, - { - name: "Additional functionality failure", - opts: []client.Option{ - client.WithAdditionalFunctionality(new(refreshFailureClient)), - }, - }, - } - - for _, sample := range samples { - // nolint:scopelint - t.Run(sample.name, func(t *testing.T) { - testStopHealingOnFailure(t, func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { - clientServerURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} - - clientServer := new(struct { - networkservice.NetworkServiceServer - }) - clientServer.NetworkServiceServer = startServer(ctx, t, clientServerURL, - endpoint.WithName("name"), - endpoint.WithAdditionalFunctionality( - heal.NewServer(ctx, - heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(clientServer)))), - adapters.NewClientToServer(counter), - clienturl.NewServer(serverURL), - connect.NewServer(ctx, client.NewClientFactory( - append([]client.Option{ - client.WithName("name"), - }, sample.opts...)...), - connect.WithDialOptions(sandbox.DialOptions()...), - connect.WithDialTimeout(time.Second), - ), - ), - ) - - return client.NewClient(ctx, - clientServerURL, - client.WithDialOptions(sandbox.DialOptions()...), - client.WithDialTimeout(time.Second), - ) - }) - }) - } -} - -func testStopHealingOnFailure( - t *testing.T, - clientSupplier func(ctx context.Context, serverURL *url.URL, counter networkservice.NetworkServiceClient) networkservice.NetworkServiceClient, -) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} - startServer(ctx, t, serverURL) - - counter := new(count.Client) - nsc := clientSupplier(ctx, serverURL, counter) - - conn, err := nsc.Request(ctx, new(networkservice.NetworkServiceRequest)) - require.NoError(t, err) - - _, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{ - Connection: conn.Clone(), - }) - require.Errorf(t, err, "refresh error") - - require.Never(t, func() bool { - // 1. Request - // 2. Failed refresh Request - // 3+. Heal Requests - return counter.Requests() > 2 - }, time.Second*2, time.Millisecond*50) -} - -type refreshFailureClient struct { - flag int32 -} - -func (c *refreshFailureClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - conn, err := next.Client(ctx).Request(ctx, request, opts...) - if err != nil { - return nil, err - } - - if err = ctx.Err(); err == nil && atomic.CompareAndSwapInt32(&c.flag, 0, 1) { - return conn, nil - } - - _, _ = next.Client(ctx).Close(ctx, conn, opts...) - - if err == nil { - err = errors.New("refresh error") - } - return nil, err -} - -func (c *refreshFailureClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { - return next.Client(ctx).Close(ctx, conn, opts...) -} diff --git a/pkg/networkservice/chains/client/options.go b/pkg/networkservice/chains/client/options.go new file mode 100644 index 000000000..7a3c8f2c3 --- /dev/null +++ b/pkg/networkservice/chains/client/options.go @@ -0,0 +1,100 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "net/url" + "time" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" +) + +type clientOptions struct { + name string + clientURL *url.URL + cc grpc.ClientConnInterface + additionalFunctionality []networkservice.NetworkServiceClient + authorizeClient networkservice.NetworkServiceClient + refreshClient networkservice.NetworkServiceClient + dialOptions []grpc.DialOption + dialTimeout time.Duration +} + +// Option modifies default client chain values. +type Option func(c *clientOptions) + +// WithName sets name for the client. +func WithName(name string) Option { + return Option(func(c *clientOptions) { + c.name = name + }) +} + +// WithClientURL sets name for the client. +func WithClientURL(clientURL *url.URL) Option { + return Option(func(c *clientOptions) { + c.clientURL = clientURL + }) +} + +// WithClientConn sets name for the client. +func WithClientConn(cc grpc.ClientConnInterface) Option { + return Option(func(c *clientOptions) { + c.cc = cc + }) +} + +// WithAdditionalFunctionality sets additionalFunctionality for the client. Note: this adds into tail of the client chain. +func WithAdditionalFunctionality(additionalFunctionality ...networkservice.NetworkServiceClient) Option { + return Option(func(c *clientOptions) { + c.additionalFunctionality = additionalFunctionality + }) +} + +// WithAuthorizeClient sets authorizeClient for the client chain. +func WithAuthorizeClient(authorizeClient networkservice.NetworkServiceClient) Option { + if authorizeClient == nil { + panic("authorizeClient cannot be nil") + } + return Option(func(c *clientOptions) { + c.authorizeClient = authorizeClient + }) +} + +// WithDialOptions sets dial options +func WithDialOptions(dialOptions ...grpc.DialOption) Option { + return Option(func(c *clientOptions) { + c.dialOptions = dialOptions + }) +} + +// WithDialTimeout sets dial timeout +func WithDialTimeout(dialTimeout time.Duration) Option { + return func(c *clientOptions) { + c.dialTimeout = dialTimeout + } +} + +// WithoutRefresh disables refresh +func WithoutRefresh() Option { + return func(c *clientOptions) { + c.refreshClient = null.NewClient() + } +} diff --git a/pkg/networkservice/chains/endpoint/combine_test.go b/pkg/networkservice/chains/endpoint/combine_test.go index 969d5c556..c4e55f7d7 100644 --- a/pkg/networkservice/chains/endpoint/combine_test.go +++ b/pkg/networkservice/chains/endpoint/combine_test.go @@ -27,6 +27,8 @@ import ( "go.uber.org/goleak" "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" @@ -312,6 +314,7 @@ func newTestEndpoint(ctx context.Context, name string) *testEndpoint { e.NetworkServiceServer = next.NewNetworkServiceServer( updatepath.NewServer(name), begin.NewServer(), + metadata.NewServer(), monitor.NewServer(ctx, &e.MonitorConnectionServer), ) return e diff --git a/pkg/networkservice/chains/nsmgr/heal_test.go b/pkg/networkservice/chains/nsmgr/heal_test.go deleted file mode 100644 index 2258456bb..000000000 --- a/pkg/networkservice/chains/nsmgr/heal_test.go +++ /dev/null @@ -1,428 +0,0 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nsmgr_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - - "github.com/networkservicemesh/api/pkg/api/registry" - - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" -) - -const ( - tick = 10 * time.Millisecond - timeout = 10 * time.Second -) - -func TestNSMGR_HealEndpoint(t *testing.T) { - var samples = []struct { - name string - nodeNum int - }{ - { - name: "Local New", - nodeNum: 0, - }, - { - name: "Remote New", - nodeNum: 1, - }, - } - - for _, sample := range samples { - t.Run(sample.name, func(t *testing.T) { - // nolint:scopelint - testNSMGRHealEndpoint(t, sample.nodeNum) - }) - } -} - -func testNSMGRHealEndpoint(t *testing.T, nodeNum int) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - - defer cancel() - domain := sandbox.NewBuilder(ctx, t). - SetNodesCount(nodeNum + 1). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil). - Build() - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService()) - require.NoError(t, err) - - nseReg := defaultRegistryEndpoint(nsReg.Name) - - counter := new(count.Server) - nse := domain.Nodes[nodeNum].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) - - request := defaultRequest(nsReg.Name) - - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - conn, err := nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, 1, counter.UniqueRequests()) - - nse.Cancel() - - nseReg2 := defaultRegistryEndpoint(nsReg.Name) - nseReg2.Name += "-2" - domain.Nodes[nodeNum].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter) - - // Wait reconnecting to the new NSE - require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) - require.Equal(t, 2, counter.UniqueRequests()) - closes := counter.UniqueCloses() - - // Check refresh - request.Connection = conn - _, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - // Close with old connection - _, err = nsc.Close(ctx, conn) - require.NoError(t, err) - - require.Equal(t, 2, counter.UniqueRequests()) - require.Equal(t, closes+1, counter.UniqueCloses()) -} - -func TestNSMGR_HealForwarder(t *testing.T) { - var samples = []struct { - name string - nodeNum int - }{ - { - name: "Local New", - nodeNum: 0, - }, - { - name: "Remote New", - nodeNum: 1, - }, - } - - for _, sample := range samples { - t.Run(sample.name, func(t *testing.T) { - // nolint:scopelint - testNSMGRHealForwarder(t, sample.nodeNum) - }) - } -} - -func testNSMGRHealForwarder(t *testing.T, nodeNum int) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - domain := sandbox.NewBuilder(ctx, t). - SetNodesCount(2). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil). - Build() - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService()) - require.NoError(t, err) - - counter := new(count.Server) - domain.Nodes[1].NewEndpoint(ctx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateTestToken, counter) - - request := defaultRequest(nsReg.Name) - - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - conn, err := nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, 1, counter.UniqueRequests()) - - for _, forwarder := range domain.Nodes[nodeNum].Forwarders { - forwarder.Cancel() - break - } - - forwarderReg := ®istry.NetworkServiceEndpoint{ - Name: sandbox.UniqueName("forwarder-2"), - } - domain.Nodes[nodeNum].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken) - - // Wait reconnecting through the new Forwarder - require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) - require.Equal(t, 2, counter.UniqueRequests()) - closes := counter.UniqueCloses() - - // Check refresh - request.Connection = conn - _, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - // Close with old connection - _, err = nsc.Close(ctx, conn) - require.NoError(t, err) - - require.Equal(t, 2, counter.UniqueRequests()) - require.Equal(t, closes+1, counter.UniqueCloses()) -} - -func TestNSMGR_HealNSMgr(t *testing.T) { - var samples = []struct { - name string - nodeNum int - restored bool - }{ - { - name: "Local Restored", - nodeNum: 0, - restored: true, - }, - { - name: "Remote New", - nodeNum: 1, - }, - } - - for _, sample := range samples { - t.Run(sample.name, func(t *testing.T) { - // nolint:scopelint - testNSMGRHealNSMgr(t, sample.nodeNum, sample.restored) - }) - } -} - -func testNSMGRHealNSMgr(t *testing.T, nodeNum int, restored bool) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - domain := sandbox.NewBuilder(ctx, t). - SetNodesCount(3). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil). - Build() - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService()) - require.NoError(t, err) - - nseReg := defaultRegistryEndpoint(nsReg.Name) - - counter := new(count.Server) - domain.Nodes[1].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) - - request := defaultRequest(nsReg.Name) - - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - conn, err := nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - if !restored { - nseReg2 := defaultRegistryEndpoint(nsReg.Name) - nseReg2.Name += "-2" - - domain.Nodes[2].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter) - - domain.Nodes[nodeNum].NSMgr.Cancel() - } else { - domain.Nodes[nodeNum].NSMgr.Restart() - } - - var closes int - if restored { - // Wait reconnecting through the restored NSMgr - require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) - require.Equal(t, 2, counter.Requests()) - } else { - // Wait reconnecting through the new NSMgr - require.Eventually(t, checkSecondRequestsReceived(counter.UniqueRequests), timeout, tick) - require.Equal(t, 2, counter.UniqueRequests()) - closes = counter.UniqueCloses() - } - - // Check refresh - request.Connection = conn - _, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - // Close with old connection - _, err = nsc.Close(ctx, conn) - require.NoError(t, err) - - if restored { - require.Equal(t, 3, counter.Requests()) - require.Equal(t, 1, counter.Closes()) - } else { - require.Equal(t, 2, counter.UniqueRequests()) - require.Equal(t, closes+1, counter.UniqueCloses()) - } -} - -func TestNSMGR_HealRegistry(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - domain := sandbox.NewBuilder(ctx, t). - SetNodesCount(1). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil). - Build() - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService()) - require.NoError(t, err) - - nseReg := defaultRegistryEndpoint(nsReg.Name) - - counter := new(count.Server) - domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) - - request := defaultRequest(nsReg.Name) - - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - conn, err := nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - // 1. Restart Registry - domain.Registry.Restart() - - // 2. Check refresh - request.Connection = conn - _, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - // 3. Check new client request - nsc = domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - _, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - - require.Equal(t, 3, counter.Requests()) -} - -func TestNSMGR_CloseHeal(t *testing.T) { - var samples = []struct { - name string - withNSEExpiration bool - }{ - { - name: "Without NSE expiration", - }, - { - name: "With NSE expiration", - withNSEExpiration: true, - }, - } - - for _, sample := range samples { - t.Run(sample.name, func(t *testing.T) { - // nolint:scopelint - testNSMGRCloseHeal(t, sample.withNSEExpiration) - }) - } -} - -func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - builder := sandbox.NewBuilder(ctx, t). - SetNodesCount(1). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil) - - if withNSEExpiration { - builder = builder.SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration) - } - - domain := builder.Build() - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService()) - require.NoError(t, err) - - nseCtx, nseCtxCancel := context.WithCancel(ctx) - - domain.Nodes[0].NewEndpoint(nseCtx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateTestToken) - - request := defaultRequest(nsReg.Name) - - nscCtx, nscCtxCancel := context.WithCancel(ctx) - - nsc := domain.Nodes[0].NewClient(nscCtx, sandbox.GenerateTestToken) - - reqCtx, reqCancel := context.WithTimeout(ctx, time.Second) - defer reqCancel() - - // 1. Request - conn, err := nsc.Request(reqCtx, request.Clone()) - require.NoError(t, err) - - ignoreCurrent := goleak.IgnoreCurrent() - - // 2. Refresh - request.Connection = conn - - conn, err = nsc.Request(reqCtx, request.Clone()) - require.NoError(t, err) - - // 3. Stop endpoint and wait for the heal to start - nseCtxCancel() - time.Sleep(100 * time.Millisecond) - - if withNSEExpiration { - // 3.1 Wait for the endpoint expiration - time.Sleep(sandbox.RegistryExpiryDuration) - } - - // 4. Close connection - _, _ = nsc.Close(nscCtx, conn.Clone()) - - nscCtxCancel() - - require.Eventually(t, func() bool { - return goleak.Find(ignoreCurrent) == nil - }, timeout, tick) - - require.NoError(t, ctx.Err()) -} - -func checkSecondRequestsReceived(requestsDone func() int) func() bool { - return func() bool { - return requestsDone() >= 2 - } -} diff --git a/pkg/networkservice/chains/nsmgr/server.go b/pkg/networkservice/chains/nsmgr/server.go index b85d47004..c5d51ada4 100644 --- a/pkg/networkservice/chains/nsmgr/server.go +++ b/pkg/networkservice/chains/nsmgr/server.go @@ -26,20 +26,20 @@ import ( "github.com/google/uuid" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" + "google.golang.org/grpc" "github.com/networkservicemesh/api/pkg/api/networkservice" registryapi "github.com/networkservicemesh/api/pkg/api/registry" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientinfo" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" "github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes" "github.com/networkservicemesh/sdk/pkg/networkservice/common/filtermechanisms" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/interpose" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" @@ -59,7 +59,6 @@ import ( registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" registrychain "github.com/networkservicemesh/sdk/pkg/registry/core/chain" "github.com/networkservicemesh/sdk/pkg/registry/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -78,7 +77,8 @@ type nsmgrServer struct { type serverOptions struct { authorizeServer networkservice.NetworkServiceServer - connectOptions []connect.Option + dialOptions []grpc.DialOption + dialTimeout time.Duration regURL *url.URL regDialOptions []grpc.DialOption name string @@ -88,10 +88,17 @@ type serverOptions struct { // Option modifies server option value type Option func(o *serverOptions) -// WithConnectOptions sets connect Options for the server -func WithConnectOptions(connectOptions ...connect.Option) Option { +// WithDialOptions sets grpc.DialOptions for the client +func WithDialOptions(dialOptions ...grpc.DialOption) Option { + return func(o *serverOptions) { + o.dialOptions = dialOptions + } +} + +// WithDialTimeout sets dial timeout for the client +func WithDialTimeout(dialTimeout time.Duration) Option { return func(o *serverOptions) { - o.connectOptions = connectOptions + o.dialTimeout = dialTimeout } } @@ -193,16 +200,19 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options recvfd.NewServer(), // Receive any files passed interpose.NewServer(&interposeRegistryServer), filtermechanisms.NewServer(&urlsRegistryServer), - heal.NewServer(ctx, - heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(rv)))), - connect.NewServer(ctx, - client.NewClientFactory( + connect.NewServer( + client.NewClient( + ctx, client.WithName(opts.name), client.WithAdditionalFunctionality( recvfd.NewClient(), sendfd.NewClient(), ), - ), opts.connectOptions...), + client.WithDialOptions(opts.dialOptions...), + client.WithDialTimeout(opts.dialTimeout), + client.WithoutRefresh(), + ), + ), sendfd.NewServer()), ) diff --git a/pkg/networkservice/chains/nsmgr/suite_test.go b/pkg/networkservice/chains/nsmgr/suite_test.go index 7dfb4e175..d261accdc 100644 --- a/pkg/networkservice/chains/nsmgr/suite_test.go +++ b/pkg/networkservice/chains/nsmgr/suite_test.go @@ -340,12 +340,16 @@ func (s *nsmgrSuite) Test_ConnectToDeadNSEUsecase() { refreshRequest := request.Clone() refreshRequest.Connection = conn.Clone() - _, err = nsc.Request(ctx, refreshRequest) + refreshCtx, refreshCancel := context.WithTimeout(ctx, time.Second) + defer refreshCancel() + _, err = nsc.Request(refreshCtx, refreshRequest) require.Error(t, err) require.NoError(t, ctx.Err()) // Close - _, _ = nsc.Close(ctx, conn) + closeCtx, closeCancel := context.WithTimeout(ctx, time.Second) + defer closeCancel() + _, _ = nsc.Close(closeCtx, conn) // Endpoint unregister _, err = s.domain.Nodes[0].NSMgr.NetworkServiceEndpointRegistryServer().Unregister(ctx, nseReg) @@ -671,6 +675,7 @@ func (s *nsmgrSuite) Test_PassThroughLocalUsecaseMultiLabel() { func (s *nsmgrSuite) Test_ShouldCleanAllClientAndEndpointGoroutines() { t := s.T() + t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) }) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -678,8 +683,6 @@ func (s *nsmgrSuite) Test_ShouldCleanAllClientAndEndpointGoroutines() { nsReg, err := s.nsRegistryClient.Register(ctx, defaultRegistryService()) require.NoError(t, err) - defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - // At this moment all possible endless NSMgr goroutines have been started. So we expect all newly created goroutines // to be canceled no later than some of these events: // 1. GRPC request context cancel @@ -784,17 +787,19 @@ func additionalFunctionalityChain(ctx context.Context, clientURL *url.URL, clien return []networkservice.NetworkServiceServer{ chain.NewNetworkServiceServer( clienturl.NewServer(clientURL), - connect.NewServer(ctx, - client.NewClientFactory( + connect.NewServer( + client.NewClient( + ctx, client.WithName(fmt.Sprintf("endpoint-client-%v", clientName)), client.WithAdditionalFunctionality( mechanismtranslation.NewClient(), replacelabels.NewClient(labels), kernel.NewClient(), ), + client.WithDialOptions(sandbox.DialOptions()...), + client.WithDialTimeout(sandbox.DialTimeout), + client.WithoutRefresh(), ), - connect.WithDialTimeout(sandbox.DialTimeout), - connect.WithDialOptions(sandbox.DialOptions()...), ), ), } diff --git a/pkg/networkservice/chains/nsmgrproxy/server.go b/pkg/networkservice/chains/nsmgrproxy/server.go index 18401efbf..a30681e52 100644 --- a/pkg/networkservice/chains/nsmgrproxy/server.go +++ b/pkg/networkservice/chains/nsmgrproxy/server.go @@ -20,6 +20,7 @@ package nsmgrproxy import ( "context" "net/url" + "time" "github.com/google/uuid" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -34,10 +35,8 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/interdomainurl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/swapip" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/registry" "github.com/networkservicemesh/sdk/pkg/registry/common/clienturl" registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect" @@ -46,7 +45,6 @@ import ( registryswapip "github.com/networkservicemesh/sdk/pkg/registry/common/swapip" registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" "github.com/networkservicemesh/sdk/pkg/registry/core/chain" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/fs" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/tools/log" @@ -71,7 +69,8 @@ type serverOptions struct { mapipFilePath string listenOn *url.URL authorizeServer networkservice.NetworkServiceServer - connectOptions []connect.Option + dialOptions []grpc.DialOption + dialTimeout time.Duration registryConnectOptions []registryconnect.Option } @@ -138,10 +137,17 @@ func WithMapIPFilePath(p string) Option { } } -// WithConnectOptions sets connect Options for the server -func WithConnectOptions(connectOptions ...connect.Option) Option { +// WithDialOptions sets connect Options for the server +func WithDialOptions(dialOptions ...grpc.DialOption) Option { return func(o *serverOptions) { - o.connectOptions = connectOptions + o.dialOptions = dialOptions + } +} + +// WithDialTimeout sets dial timeout for the server +func WithDialTimeout(dialTimeout time.Duration) Option { + return func(o *serverOptions) { + o.dialTimeout = dialTimeout } } @@ -182,12 +188,14 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to interdomainurl.NewServer(&nseStockServer), discover.NewServer(nsClient, nseClient), swapip.NewServer(opts.openMapIPChannel(ctx)), - heal.NewServer(ctx, - heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(rv)))), - connect.NewServer(ctx, - client.NewClientFactory( + connect.NewServer( + client.NewClient( + ctx, client.WithName(opts.name), - ), opts.connectOptions..., + client.WithDialOptions(opts.dialOptions...), + client.WithDialTimeout(opts.dialTimeout), + client.WithoutRefresh(), + ), ), ), ) diff --git a/pkg/networkservice/chains/nsmgrproxy/server_test.go b/pkg/networkservice/chains/nsmgrproxy/server_test.go index 8e0b50e7b..997673f1c 100644 --- a/pkg/networkservice/chains/nsmgrproxy/server_test.go +++ b/pkg/networkservice/chains/nsmgrproxy/server_test.go @@ -572,13 +572,17 @@ func Test_Interdomain_PassThroughUsecase(t *testing.T) { additionalFunctionality = []networkservice.NetworkServiceServer{ chain.NewNetworkServiceServer( clienturl.NewServer(clusters[i].Nodes[0].NSMgr.URL), - connect.NewServer(ctx, - client.NewClientFactory(client.WithAdditionalFunctionality( - newPassTroughClient(fmt.Sprintf("my-service-remote-%v@cluster%v", i-1, i-1)), - kernelmech.NewClient(), - )), - connect.WithDialTimeout(sandbox.DialTimeout), - connect.WithDialOptions(sandbox.DialOptions()...), + connect.NewServer( + client.NewClient( + ctx, + client.WithAdditionalFunctionality( + newPassTroughClient(fmt.Sprintf("my-service-remote-%v@cluster%v", i-1, i-1)), + kernelmech.NewClient(), + ), + client.WithDialTimeout(sandbox.DialTimeout), + client.WithDialOptions(sandbox.DialOptions()...), + client.WithoutRefresh(), + ), ), ), } diff --git a/pkg/networkservice/common/begin/server.go b/pkg/networkservice/common/begin/server.go index 4109ec2bf..ea7257c2a 100644 --- a/pkg/networkservice/common/begin/server.go +++ b/pkg/networkservice/common/begin/server.go @@ -85,7 +85,7 @@ func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection eventFactoryServer, ok := b.Load(conn.GetId()) if !ok { // If we don't have a connection to Close, just let it be - return + return &emptypb.Empty{}, nil } <-eventFactoryServer.executor.AsyncExec(func() { if eventFactoryServer.state != established || eventFactoryServer.request == nil { @@ -101,5 +101,5 @@ func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection emp, err = next.Server(ctx).Close(ctx, conn) eventFactoryServer.afterCloseFunc() }) - return emp, err + return &emptypb.Empty{}, err } diff --git a/pkg/networkservice/common/clientconn/client.go b/pkg/networkservice/common/clientconn/client.go new file mode 100644 index 000000000..de4d9ef6c --- /dev/null +++ b/pkg/networkservice/common/clientconn/client.go @@ -0,0 +1,57 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package clientconn - chain element for injecting a grpc.ClientConnInterface into the client chain +package clientconn + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +// NewClient - returns a clientconn chain element +func NewClient(cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return &clientConnClient{ + cc: cc, + } +} + +type clientConnClient struct { + cc grpc.ClientConnInterface +} + +func (c *clientConnClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + if c.cc != nil { + _, _ = LoadOrStore(ctx, c.cc) + } + return next.Client(ctx).Request(ctx, request) +} + +func (c *clientConnClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + _, err := next.Client(ctx).Close(ctx, conn) + if c.cc != nil { + cc, loaded := LoadAndDelete(ctx) + if loaded && cc != c.cc { + Store(ctx, cc) + } + } + return &emptypb.Empty{}, err +} diff --git a/pkg/networkservice/common/clientconn/metadata.go b/pkg/networkservice/common/clientconn/metadata.go new file mode 100644 index 000000000..71605003b --- /dev/null +++ b/pkg/networkservice/common/clientconn/metadata.go @@ -0,0 +1,75 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package clientconn allows storing grpc.ClientConnInterface stored in per Connection.Id metadata +package clientconn + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type key struct{} + +// Store sets the grpc.ClientConnInterface stored in per Connection.Id metadata. +func Store(ctx context.Context, cc grpc.ClientConnInterface) { + metadata.Map(ctx, true).Store(key{}, cc) +} + +// Delete deletes the grpc.ClientConnInterface stored in per Connection.Id metadata +func Delete(ctx context.Context) { + metadata.Map(ctx, true).Delete(key{}) +} + +// Load returns the grpc.ClientConnInterface stored in per Connection.Id metadata, or nil if no +// value is present. +// The ok result indicates whether value was found in the per Connection.Id metadata. +func Load(ctx context.Context) (value grpc.ClientConnInterface, ok bool) { + m := metadata.Map(ctx, true) + rawValue, ok := m.Load(key{}) + if !ok { + return + } + value, ok = rawValue.(grpc.ClientConnInterface) + return value, ok +} + +// LoadOrStore returns the existing grpc.ClientConnInterface stored in per Connection.Id metadata if present. +// Otherwise, it stores and returns the given grpc.ClientConnInterface. +// The loaded result is true if the value was loaded, false if stored. +func LoadOrStore(ctx context.Context, cc grpc.ClientConnInterface) (value grpc.ClientConnInterface, ok bool) { + rawValue, ok := metadata.Map(ctx, true).LoadOrStore(key{}, cc) + if !ok { + return cc, ok + } + value, ok = rawValue.(grpc.ClientConnInterface) + return value, ok +} + +// LoadAndDelete deletes the grpc.ClientConnInterface stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func LoadAndDelete(ctx context.Context) (value grpc.ClientConnInterface, ok bool) { + m := metadata.Map(ctx, true) + rawValue, ok := m.LoadAndDelete(key{}) + if !ok { + return + } + value, ok = rawValue.(grpc.ClientConnInterface) + return value, ok +} diff --git a/pkg/networkservice/common/clienturl/client.go b/pkg/networkservice/common/clienturl/client.go new file mode 100644 index 000000000..c8cf3a9d3 --- /dev/null +++ b/pkg/networkservice/common/clienturl/client.go @@ -0,0 +1,56 @@ +// Copyright (c) 2020-2021 Cisco Systems, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package clienturl provides server chain element that sets client URL in context +package clienturl + +import ( + "context" + "net/url" + + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" + + "github.com/golang/protobuf/ptypes/empty" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type clientURLClient struct { + u *url.URL +} + +// NewClient - returns a new client chain element that sets client URL in context +func NewClient(u *url.URL) networkservice.NetworkServiceClient { + return &clientURLClient{u: u} +} + +func (c *clientURLClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + if c.u != nil { + ctx = clienturlctx.WithClientURL(ctx, c.u) + } + return next.Client(ctx).Request(ctx, request, opts...) +} + +func (c *clientURLClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + if c.u != nil { + ctx = clienturlctx.WithClientURL(ctx, c.u) + } + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/connect/README.md b/pkg/networkservice/common/connect/README.md deleted file mode 100644 index 2966f4fa5..000000000 --- a/pkg/networkservice/common/connect/README.md +++ /dev/null @@ -1,49 +0,0 @@ -# Functional requirements - -Upon receipt of a Request, the connect Server chain element must send, as a client, a corresponding request to -another Server. For clarity, we will refer to the incoming Request to the server as the 'server Request'. - -If the server request.GetConnection.GetId() is not associated to an existing client Connection, a new Connection -must be established by sending a Request to the Server indicated by the clienturl.ClientURL(ctx). - -If the server request.GetConnection.GetId() is associated to an existing client Connection, that client Connection needs -to be sent as part of the client request to the server the client connection was received from. - -If the server is asked to Close an existing server connection, it should also Close the corresponding client Connection -with the server that client Connection was received from. Even if the attempt to Close the client connection fails, it should -continue to call down the next.Server(ctx) chain. - -# Implementation - -## connectServer - -`connectServer` keeps `connInfos` [connectionInfoMap](./gen.go) mapping incoming server `Connection.ID` to the remote -server URL and to the client chain assigned to this URL mapping remote server URL to a `connectClient`. Notably, on -every Close it is deleted from the `clients` map, so eventually its context will be canceled and the corresponding -`grpc.ClientConn` will be closed. - -Care is taken to make sure that each client chain results in one increment of the refcount on its creation, and one -decrement when it receives a Close. In this way, we can be sure that `clienturl.NewClient(...)` context is closed after -the last client chain using it has received its Close. - -The overall result is that usually, `connectServer` will have no more than one `clienturl.NewClient(...)` per `clientURL`. -It may occasionally have more than one in a transient fashion for the lifetime of one or more Connections. -In all events it will have zero `clienturl.NewClient(...)` for a `clientURL` if it has no server Connections for that -`clientURL`. - -## connectClient - -`connectClient` handles the instantiation and management of the client connection from the `clienturl.ClientURL(ctx)` of -the server Request. - -## Monitoring - -Both `connectServer` and `connectClient` by themselves doesn't monitor gRPC connection for liveness. Every error received -from Request, Close is simply returned to the previous chain elements. But there are some cases when connection should -be closed and reopened on some event happens. For this purpose `connectServer` injects into the client chain context a -cancel function for this context ([cancelctx](../../../tools/cancelctx/context.go)). When this context becomes canceled, -`connectClient` closes corresponding gRPC connection and `connectServer` force decreases its refcount to 0. - -The most common way for the NSM chains is using [heal client](../heal/client.go) for monitoring and canceling the -connection, but it also can be implemented in some other way ([example using gRPC health check API](./monitor_client_test.go)) -or not be implemented at all. diff --git a/pkg/networkservice/common/connect/client.go b/pkg/networkservice/common/connect/client.go index b4e44b275..cb632eca1 100644 --- a/pkg/networkservice/common/connect/client.go +++ b/pkg/networkservice/common/connect/client.go @@ -1,6 +1,4 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. -// -// Copyright (c) 2020-2021 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,73 +18,35 @@ package connect import ( "context" - "sync" - "time" - "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" - "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" - "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" ) -type connectClient struct { - ctx context.Context - dialTimeout time.Duration - dialOptions []grpc.DialOption - dialErr error - - clientFactory ClientFactory - client networkservice.NetworkServiceClient - - initOnce sync.Once -} - -func (u *connectClient) init() error { - u.initOnce.Do(func() { - clockTime := clock.FromContext(u.ctx) - - clientURL := clienturlctx.ClientURL(u.ctx) - if clientURL == nil { - u.dialErr = errors.New("cannot dial nil clienturl.ClientURL(ctx)") - return - } - - dialCtx, dialCancel := clockTime.WithTimeout(u.ctx, u.dialTimeout) - defer dialCancel() - - dialOptions := append(append([]grpc.DialOption{}, u.dialOptions...), grpc.WithReturnConnectionError()) - - var cc *grpc.ClientConn - cc, u.dialErr = grpc.DialContext(dialCtx, grpcutils.URLToTarget(clientURL), dialOptions...) - if u.dialErr != nil { - return - } - - u.client = u.clientFactory(u.ctx, cc) - - go func() { - <-u.ctx.Done() - _ = cc.Close() - }() - }) +type connectClient struct{} - return u.dialErr +// NewClient - returns a connect chain element +func NewClient() networkservice.NetworkServiceClient { + return &connectClient{} } -func (u *connectClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - if err := u.init(); err != nil { - return nil, err +func (c *connectClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + cc, loaded := clientconn.Load(ctx) + if !loaded { + return nil, errors.New("no grpc.ClientConnInterface provided") } - return u.client.Request(ctx, request, opts...) + conn, err := networkservice.NewNetworkServiceClient(cc).Request(ctx, request, opts...) + return conn, err } -func (u *connectClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - if err := u.init(); err != nil { - return nil, err +func (c *connectClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cc, loaded := clientconn.Load(ctx) + if !loaded { + return nil, errors.New("no grpc.ClientConnInterface provided") } - return u.client.Close(ctx, conn, opts...) + return networkservice.NewNetworkServiceClient(cc).Close(ctx, conn, opts...) } diff --git a/pkg/networkservice/common/heal/gen.go b/pkg/networkservice/common/connect/doc.go similarity index 59% rename from pkg/networkservice/common/heal/gen.go rename to pkg/networkservice/common/connect/doc.go index 0650adb68..5156158e5 100644 --- a/pkg/networkservice/common/heal/gen.go +++ b/pkg/networkservice/common/connect/doc.go @@ -1,6 +1,4 @@ -// Copyright (c) 2020 Cisco and/or its affiliates. -// -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -16,16 +14,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -package heal - -import ( - "sync" -) - -//go:generate go-syncmap -output connection_info_map.gen.go -type connectionInfoMap - -type connectionInfoMap sync.Map - -//go:generate go-syncmap -output context_wrapper_map.gen.go -type ctxWrapperMap - -type ctxWrapperMap sync.Map +// Package connect provides chain elements to 'connect' clients +package connect diff --git a/pkg/networkservice/common/connect/example_test.go b/pkg/networkservice/common/connect/example_test.go new file mode 100644 index 000000000..ad95808af --- /dev/null +++ b/pkg/networkservice/common/connect/example_test.go @@ -0,0 +1,64 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connect_test + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" + "github.com/networkservicemesh/sdk/pkg/tools/token" +) + +// ExampleForwarder - example of how to use the connect chain element in a forwarder +func ExampleNewServer() { + var dialOptions []grpc.DialOption + var callOptions []grpc.CallOption + var additonalClientFunctionality []networkservice.NetworkServiceClient + var beforeConnectServer1, beforeConnectServer2 networkservice.NetworkServiceServer + var afterConnectServer1, afterConnectServer2 networkservice.NetworkServiceServer + var chainCtx context.Context + var tokenGenerator token.GeneratorFunc + var name string + forwarder := endpoint.NewServer( + chainCtx, + tokenGenerator, + endpoint.WithName(name), + endpoint.WithAdditionalFunctionality( + beforeConnectServer1, + beforeConnectServer2, + connect.NewServer( + client.NewClient( + chainCtx, + client.WithAdditionalFunctionality(additonalClientFunctionality...), + client.WithDialOptions(dialOptions...), + client.WithoutRefresh(), + client.WithName(name), + ), + callOptions..., + ), + afterConnectServer1, + afterConnectServer2, + ), + ) + if forwarder != nil { + } +} diff --git a/pkg/networkservice/common/connect/server.go b/pkg/networkservice/common/connect/server.go index a6c15df61..6f634b2c8 100644 --- a/pkg/networkservice/common/connect/server.go +++ b/pkg/networkservice/common/connect/server.go @@ -1,6 +1,4 @@ -// Copyright (c) 2020 Cisco and/or its affiliates. -// -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -16,168 +14,60 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package connect is intended to allow passthrough style Endpoints to have a server that also connects to a client package connect import ( "context" - "net/url" - "time" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" - "github.com/networkservicemesh/sdk/pkg/tools/log" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" ) -// ClientFactory is used to created new clients when new connection is created. -type ClientFactory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient - type connectServer struct { - ctx context.Context - clientFactory ClientFactory - clientDialTimeout time.Duration - clientDialOptions []grpc.DialOption + client networkservice.NetworkServiceClient + callOptions []grpc.CallOption } -type connectionInfo struct { - clientURL *url.URL - cancel context.CancelFunc -} - -// NewServer - server chain element that creates client subchains and requests them selecting by -// clienturlctx.ClientURL(ctx) -func NewServer( - ctx context.Context, - clientFactory ClientFactory, - options ...Option, -) networkservice.NetworkServiceServer { - s := &connectServer{ - ctx: ctx, - clientFactory: clientFactory, - clientDialTimeout: time.Second, - } - - for _, opt := range options { - opt(s) +// NewServer - returns a connect chain element +func NewServer(client networkservice.NetworkServiceClient, callOptions ...grpc.CallOption) networkservice.NetworkServiceServer { + return &connectServer{ + client: client, + callOptions: callOptions, } - - return s } -func (s *connectServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { - clientURL := clienturlctx.ClientURL(ctx) - if clientURL == nil { - return nil, errors.Errorf("clientURL not found for incoming connection: %+v", request.GetConnection()) - } - - var connInfo *connectionInfo - connInfo, err = s.connInfo(ctx, request.GetConnection()) - if err != nil { - return nil, err - } - - var client *connectClient - client, connInfo.cancel = s.client(clientURL) - defer func() { - if err != nil { - connInfo.cancel() - } - }() - - postponeCtxFunc := postpone.ContextWithValues(ctx) - - conn, err = client.Request(ctx, request.Clone()) - if err != nil { - return nil, err +func (c *connectServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + closeCtxFunc := postpone.ContextWithValues(ctx) + clientConn, clientErr := c.client.Request(ctx, request, c.callOptions...) + if clientErr != nil { + return nil, clientErr } - - request.Connection = conn - - if conn, err = next.Server(ctx).Request(ctx, request); err != nil { - closeCtx, cancelClose := postponeCtxFunc() - defer cancelClose() - - _, closeErr := client.Close(closeCtx, request.Connection.Clone()) - if closeErr != nil { - err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) - } - - connInfo.cancel() + request.Connection = clientConn + serverConn, serverErr := next.Server(ctx).Request(ctx, request) + if serverErr != nil { + closeCtx, closeCancel := closeCtxFunc() + defer closeCancel() + _, _ = c.client.Close(closeCtx, clientConn, c.callOptions...) } - - return conn, err + return serverConn, serverErr } -func (s *connectServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - var clientErr error - if connInfo, ok := load(ctx); ok { - connInfo.cancel() - - client, cancel := s.client(connInfo.clientURL) - defer cancel() - - _, clientErr = client.Close(ctx, conn) - } else { - clientErr = errors.Errorf("no client found for the connection: %s", conn.GetId()) - } - - _, err := next.Server(ctx).Close(ctx, conn) - - if clientErr != nil && err != nil { - return nil, errors.Wrapf(err, "errors during client close: %s", clientErr.Error()) +func (c *connectServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { + _, clientErr := c.client.Close(ctx, conn, c.callOptions...) + _, serverErr := next.Server(ctx).Close(ctx, conn) + if clientErr != nil && serverErr != nil { + return nil, errors.Wrapf(serverErr, "errors during client close: %v", clientErr) } if clientErr != nil { return nil, errors.Wrap(clientErr, "errors during client close") } - return &empty.Empty{}, err -} - -func (s *connectServer) connInfo(ctx context.Context, conn *networkservice.Connection) (*connectionInfo, error) { - logger := log.FromContext(ctx).WithField("connectServer", "connInfo") - - clientURL := clienturlctx.ClientURL(ctx) - - connInfo, loaded := loadOrStore(ctx, &connectionInfo{ - clientURL: clientURL, - cancel: func() {}, - }) - if !loaded { - return connInfo, nil - } - - connInfo.cancel() - - if connInfo.clientURL.String() != clientURL.String() { - client, cancel := s.client(connInfo.clientURL) - defer cancel() - - if _, clientErr := client.Close(ctx, conn.Clone()); clientErr != nil { - logger.Warnf("failed to close client: %s", clientErr.Error()) - } - - if err := ctx.Err(); err != nil { - return nil, err - } - - connInfo.clientURL = clientURL - } - - return connInfo, nil -} - -func (s *connectServer) client(clientURL *url.URL) (*connectClient, context.CancelFunc) { - ctx, cancel := context.WithCancel(s.ctx) - return &connectClient{ - ctx: clienturlctx.WithClientURL(ctx, clientURL), - dialTimeout: s.clientDialTimeout, - clientFactory: s.clientFactory, - dialOptions: s.clientDialOptions, - }, cancel + return &empty.Empty{}, serverErr } diff --git a/pkg/networkservice/common/connect/server_test.go b/pkg/networkservice/common/connect/server_test.go index 630c36fae..8688a6d1e 100644 --- a/pkg/networkservice/common/connect/server_test.go +++ b/pkg/networkservice/common/connect/server_test.go @@ -31,6 +31,8 @@ import ( "go.uber.org/goleak" "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/dial" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" @@ -61,15 +63,15 @@ func TestConnectServer_Request(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return next.NewNetworkServiceClient( - adapters.NewServerToClient(serverClient), - networkservice.NewNetworkServiceClient(cc), - ) - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + adapters.NewServerToClient(serverClient), + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Second), + dial.WithDialOptions(grpc.WithInsecure()), + ), + connect.NewClient(), + ), ), serverNext, ) @@ -173,7 +175,7 @@ func TestConnectServer_Request(t *testing.T) { // 8. Close B - _, err = s.Close(ctx, conn) + _, err = s.Close(clienturlctx.WithClientURL(ctx, urlB), conn) require.NoError(t, err) require.Nil(t, serverClient.capturedRequest) @@ -191,15 +193,15 @@ func TestConnectServer_RequestParallel(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return next.NewNetworkServiceClient( - serverClient, - networkservice.NewNetworkServiceClient(cc), - ) - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Second), + dial.WithDialOptions(grpc.WithInsecure()), + ), + serverClient, + connect.NewClient(), + ), ), serverNext, ) @@ -256,7 +258,7 @@ func TestConnectServer_RequestParallel(t *testing.T) { requestCancel() // 4.4. Close A - _, err = s.Close(ctx, conn) + _, err = s.Close(clienturlctx.WithClientURL(ctx, urlA), conn) assert.NoError(t, err) wg.Done() }(i) @@ -291,12 +293,15 @@ func TestConnectServer_RequestFail(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return injecterror.NewClient() - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Second), + dial.WithDialOptions(grpc.WithInsecure()), + ), + injecterror.NewClient(), + connect.NewClient(), + ), ), ) @@ -338,15 +343,15 @@ func TestConnectServer_RequestNextServerError(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return next.NewNetworkServiceClient( - adapters.NewServerToClient(serverClient), - networkservice.NewNetworkServiceClient(cc), - ) - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Second), + dial.WithDialOptions(grpc.WithInsecure()), + ), + adapters.NewServerToClient(serverClient), + connect.NewClient(), + ), ), injecterror.NewServer(), ) @@ -405,12 +410,14 @@ func TestConnectServer_RemoteRestarted(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return networkservice.NewNetworkServiceClient(cc) - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Second), + dial.WithDialOptions(grpc.WithInsecure()), + ), + connect.NewClient(), + ), ), ) @@ -479,7 +486,7 @@ func TestConnectServer_RemoteRestarted(t *testing.T) { // 6. Close A - _, err = s.Close(ctx, conn) + _, err = s.Close(clienturlctx.WithClientURL(ctx, urlA), conn) require.NoError(t, err) } @@ -490,12 +497,14 @@ func TestConnectServer_DialTimeout(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return networkservice.NewNetworkServiceClient(cc) - }, - connect.WithDialTimeout(100*time.Millisecond), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(100*time.Millisecond), + dial.WithDialOptions(grpc.WithInsecure(), grpc.WithBlock()), + ), + connect.NewClient(), + ), ), ) @@ -520,15 +529,12 @@ func TestConnectServer_DialTimeout(t *testing.T) { // 3. Request A - timer := time.AfterFunc(time.Second/2, t.FailNow) - requestCtx, requestCancel := context.WithTimeout(context.Background(), time.Second) defer requestCancel() _, err = s.Request(clienturlctx.WithClientURL(requestCtx, urlA), request.Clone()) require.Error(t, err) - - timer.Stop() + require.NoError(t, requestCtx.Err()) } func TestConnectServer_ChangeURLWithExpiredContext(t *testing.T) { @@ -538,12 +544,14 @@ func TestConnectServer_ChangeURLWithExpiredContext(t *testing.T) { s := next.NewNetworkServiceServer( metadata.NewServer(), - connect.NewServer(context.Background(), - func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return networkservice.NewNetworkServiceClient(cc) - }, - connect.WithDialTimeout(time.Second), - connect.WithDialOptions(grpc.WithInsecure()), + connect.NewServer( + next.NewNetworkServiceClient( + dial.NewClient(context.Background(), + dial.WithDialTimeout(time.Hour), + dial.WithDialOptions(grpc.WithInsecure()), + ), + connect.NewClient(), + ), ), ) diff --git a/pkg/networkservice/common/dial/client.go b/pkg/networkservice/common/dial/client.go new file mode 100644 index 000000000..844bb7bf5 --- /dev/null +++ b/pkg/networkservice/common/dial/client.go @@ -0,0 +1,115 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package dial will dial up a grpc.ClientConnInterface if a client *url.URL is provided in the ctx, retrievable by +// clienturlctx.ClientURL(ctx) and put the resulting grpc.ClientConnInterface into the ctx using clientconn.Store(..) +// where it can be retrieved by other chain elements using clientconn.Load(...) +package dial + +import ( + "context" + "time" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/tools/postpone" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" +) + +type dialClient struct { + chainCtx context.Context + dialOptions []grpc.DialOption + dialTimeout time.Duration +} + +// NewClient - returns new dial chain element +func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkServiceClient { + o := &option{} + for _, opt := range opts { + opt(o) + } + return &dialClient{ + chainCtx: chainCtx, + dialOptions: o.dialOptions, + dialTimeout: o.dialTimeout, + } +} + +func (d *dialClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + closeContextFunc := postpone.ContextWithValues(ctx) + // If no clientURL, we have no work to do + // call the next in the chain + clientURL := clienturlctx.ClientURL(ctx) + if clientURL == nil { + return next.Client(ctx).Request(ctx, request, opts...) + } + + cc, _ := clientconn.LoadOrStore(ctx, newDialer(d.chainCtx, d.dialTimeout, d.dialOptions...)) + + // If there's an existing grpc.ClientConnInterface and it's not ours, call the next in the chain + di, ok := cc.(*dialer) + if !ok { + return next.Client(ctx).Request(ctx, request, opts...) + } + + // If our existing dialer has a different URL close down the chain + if di.clientURL != nil && di.clientURL.String() != clientURL.String() { + closeCtx, closeCancel := closeContextFunc() + defer closeCancel() + _ = di.Dial(closeCtx, di.clientURL) + _, _ = next.Client(ctx).Close(clienturlctx.WithClientURL(closeCtx, di.clientURL), request.GetConnection(), opts...) + } + + err := di.Dial(ctx, clientURL) + if err != nil { + return nil, err + } + + conn, err := next.Client(ctx).Request(ctx, request, opts...) + if err != nil { + _ = di.Close() + return nil, err + } + return conn, nil +} + +func (d *dialClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + // If no clientURL, we have no work to do + // call the next in the chain + clientURL := clienturlctx.ClientURL(ctx) + if clientURL == nil { + return next.Client(ctx).Close(ctx, conn, opts...) + } + + cc, _ := clientconn.Load(ctx) + + di, ok := cc.(*dialer) + if !ok { + return next.Client(ctx).Close(ctx, conn, opts...) + } + defer func() { + _ = di.Close() + clientconn.Delete(ctx) + }() + _ = di.Dial(ctx, clientURL) + + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/dial/dialer.go b/pkg/networkservice/common/dial/dialer.go new file mode 100644 index 000000000..58c27bb05 --- /dev/null +++ b/pkg/networkservice/common/dial/dialer.go @@ -0,0 +1,94 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dial + +import ( + "context" + "net/url" + "runtime" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" +) + +type dialer struct { + ctx context.Context + cleanupContext context.Context + clientURL *url.URL + cleanupCancel context.CancelFunc + *grpc.ClientConn + dialOptions []grpc.DialOption + dialTimeout time.Duration +} + +func newDialer(ctx context.Context, dialTimeout time.Duration, dialOptions ...grpc.DialOption) *dialer { + return &dialer{ + ctx: ctx, + dialOptions: dialOptions, + dialTimeout: dialTimeout, + } +} + +func (di *dialer) Dial(ctx context.Context, clientURL *url.URL) error { + if di == nil { + return errors.New("cannot call dialer.Dial on nil dialer") + } + // Cleanup any previous grpc.ClientConn + if di.cleanupCancel != nil { + di.cleanupCancel() + } + + // Set the clientURL + di.clientURL = clientURL + + // Setup dialTimeout if needed + dialCtx := ctx + if di.dialTimeout != 0 { + dialCtx, _ = clock.FromContext(di.ctx).WithTimeout(dialCtx, di.dialTimeout) + } + + // Dial + target := grpcutils.URLToTarget(di.clientURL) + cc, err := grpc.DialContext(dialCtx, target, di.dialOptions...) + if err != nil { + if cc != nil { + _ = cc.Close() + } + return errors.Wrapf(err, "failed to dial %s", target) + } + di.ClientConn = cc + + di.cleanupContext, di.cleanupCancel = context.WithCancel(di.ctx) + + go func(cleanupContext context.Context, cc *grpc.ClientConn) { + <-cleanupContext.Done() + _ = cc.Close() + }(di.cleanupContext, cc) + return nil +} + +func (di *dialer) Close() error { + if di != nil && di.cleanupCancel != nil { + di.cleanupCancel() + runtime.Gosched() + } + return nil +} diff --git a/pkg/networkservice/common/connect/option.go b/pkg/networkservice/common/dial/options.go similarity index 57% rename from pkg/networkservice/common/connect/option.go rename to pkg/networkservice/common/dial/options.go index 9cd04cd19..423a018cb 100644 --- a/pkg/networkservice/common/connect/option.go +++ b/pkg/networkservice/common/dial/options.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package connect +package dial import ( "time" @@ -22,19 +22,24 @@ import ( "google.golang.org/grpc" ) -// Option is an option for the connect server -type Option func(s *connectServer) +type option struct { + dialOptions []grpc.DialOption + dialTimeout time.Duration +} -// WithDialTimeout sets connect server client dial timeout -func WithDialTimeout(dialTimeout time.Duration) Option { - return func(s *connectServer) { - s.clientDialTimeout = dialTimeout +// Option - options for the dial chain element +type Option func(*option) + +// WithDialOptions - grpc.DialOptions for use by the dial chain element +func WithDialOptions(dialOptions ...grpc.DialOption) Option { + return func(o *option) { + o.dialOptions = dialOptions } } -// WithDialOptions sets connect server client dial options -func WithDialOptions(opts ...grpc.DialOption) Option { - return func(s *connectServer) { - s.clientDialOptions = opts +// WithDialTimeout - dialTimeout for use by dial chain element. +func WithDialTimeout(dialTimeout time.Duration) Option { + return func(o *option) { + o.dialTimeout = dialTimeout } } diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go deleted file mode 100644 index e3b1b4251..000000000 --- a/pkg/networkservice/common/heal/client.go +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package heal provides a chain element that carries out proper nsm healing from client to endpoint -package heal - -import ( - "context" - "sync" - - "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" - "google.golang.org/grpc" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" -) - -type healClient struct { - baseCtx, ctx context.Context - cancel context.CancelFunc - cc networkservice.MonitorConnectionClient - changeEndpoint bool -} - -type connectionInfo struct { - conn *networkservice.Connection - cond *sync.Cond - active bool -} - -// NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state -// and calls heal server in case connection breaks if heal server is present in the chain -// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. -// - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages. -func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient, opts ...Option) networkservice.NetworkServiceClient { - healOpts := new(healOptions) - for _, opt := range opts { - opt(healOpts) - } - u := &healClient{ - baseCtx: ctx, - cc: cc, - changeEndpoint: healOpts.changeEndpoint, - } - - u.ctx, u.cancel = context.WithCancel(ctx) - - return u -} - -func (u *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - connInfo, _ := loadOrStore(ctx, &connectionInfo{ - conn: request.GetConnection().Clone(), - cond: sync.NewCond(new(sync.Mutex)), - }) - u.replaceConnectionPath(request.GetConnection(), connInfo) - - if err := u.listenToConnectionChanges(ctx, connInfo); err != nil { - return nil, err - } - - postponeCtxFunc := postpone.ContextWithValues(ctx) - - conn, err := next.Client(ctx).Request(ctx, request, opts...) - if err != nil { - return nil, err - } - - condCh, cancel := u.condCh(connInfo) - defer cancel() - - // Problem: - // gRPC requests and streaming requests have different requirements for the underlying gRPC connection. There can - // be a case on the closing gRPC connection, when it is still sending requests but fails to keep or start - // a streaming request. In such case we can get a situation when we have a Connection with no monitor stream, so - // it would never be healed. - // Solution: - // We should check that monitor stream is ready for the Connection, before returning it back. This check should - // wait for some time after the next.Request(), because for the [NSMgr -> Endpoint] case response may come faster - // than monitor stream update. - select { - case <-ctx.Done(): - err = ctx.Err() - case <-condCh: - case <-u.ctx.Done(): - err = errors.Errorf("timeout waiting for connection monitor: %s", conn.GetId()) - } - if err != nil { - closeCtx, cancelClose := postponeCtxFunc() - defer cancelClose() - - if _, closeErr := next.Client(ctx).Close(closeCtx, conn); closeErr != nil { - err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error()) - } - - return nil, err - } - - connInfo.cond.L.Lock() - defer connInfo.cond.L.Unlock() - - connInfo.conn = conn.Clone() - - return conn, nil -} - -func (u *healClient) condCh(connInfo *connectionInfo) (ch chan struct{}, cancel func()) { - ch, condFlag := make(chan struct{}), false - go func() { - defer close(ch) - - connInfo.cond.L.Lock() - defer connInfo.cond.L.Unlock() - - if !condFlag && !connInfo.active { - connInfo.cond.Wait() - } - }() - - return ch, func() { - connInfo.cond.L.Lock() - defer connInfo.cond.L.Unlock() - - condFlag = true - connInfo.cond.Broadcast() - } -} - -func (u *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - u.cancel() - - connInfo, loaded := load(ctx) - if loaded { - u.replaceConnectionPath(conn, connInfo) - } - - return next.Client(ctx).Close(ctx, conn, opts...) -} - -// listenToConnectionChanges - loops on events from MonitorConnectionClient while the monitor client is alive. -// Updates connection cache and broadcasts events of successful connections. -// Calls heal when something breaks. -func (u *healClient) listenToConnectionChanges(ctx context.Context, connInfo *connectionInfo) error { - monitorClient, err := u.cc.MonitorConnections(u.ctx, &networkservice.MonitorScopeSelector{ - PathSegments: []*networkservice.PathSegment{{Name: connInfo.conn.GetCurrentPathSegment().Name}, {Name: ""}}, - }) - if err != nil { - return errors.Wrap(err, "MonitorConnections failed") - } - - var healConnection requestHealFuncType - if healConnection = requestHealConnectionFunc(ctx); healConnection == nil { - healConnection = func(*networkservice.Connection) {} - } - - var restoreConnection requestHealFuncType - if restoreConnection = requestRestoreConnectionFunc(ctx); restoreConnection == nil { - restoreConnection = func(*networkservice.Connection) {} - } - - connID := connInfo.conn.GetId() - go func() { - for event, err := monitorClient.Recv(); err == nil; event, err = monitorClient.Recv() { - for _, eventConn := range event.GetConnections() { - if eventConn.GetPrevPathSegment().GetId() != connID { - continue - } - - connInfo.cond.L.Lock() - - switch event.GetType() { - // Why both INITIAL_STATE_TRANSFER and UPDATE: - // Sometimes we start polling events too late, and when we wait for confirmation of success of some connection, - // this connection is in the INITIAL_STATE_TRANSFER event, so we must treat these events the same as UPDATE. - case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, networkservice.ConnectionEventType_UPDATE: - connInfo.active = true - connInfo.conn.Path.PathSegments = eventConn.Clone().Path.PathSegments - if u.changeEndpoint { - connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName - } - connInfo.cond.Broadcast() - case networkservice.ConnectionEventType_DELETE: - if connInfo.active { - healConnection(connInfo.conn) - } - connInfo.active = false - } - - connInfo.cond.L.Unlock() - } - } - - if u.baseCtx.Err() != nil || u.ctx.Err() != nil { - return - } - - connInfo.cond.L.Lock() - defer connInfo.cond.L.Unlock() - - if connInfo.active { - restoreConnection(connInfo.conn) - } - }() - - return nil -} - -func (u *healClient) replaceConnectionPath(conn *networkservice.Connection, connInfo *connectionInfo) { - path := conn.GetPath() - if path != nil && int(path.Index) < len(path.PathSegments)-1 { - connInfo.cond.L.Lock() - defer connInfo.cond.L.Unlock() - - path.PathSegments = path.PathSegments[:path.Index+1] - path.PathSegments = append(path.PathSegments, connInfo.conn.Path.PathSegments[path.Index+1:]...) - conn.NetworkServiceEndpointName = connInfo.conn.NetworkServiceEndpointName - } -} diff --git a/pkg/networkservice/common/heal/connection_info_map.gen.go b/pkg/networkservice/common/heal/connection_info_map.gen.go deleted file mode 100644 index 242591f5a..000000000 --- a/pkg/networkservice/common/heal/connection_info_map.gen.go +++ /dev/null @@ -1,73 +0,0 @@ -// Code generated by "-output connection_info_map.gen.go -type connectionInfoMap -output connection_info_map.gen.go -type connectionInfoMap"; DO NOT EDIT. -package heal - -import ( - "sync" // Used by sync.Map. -) - -// Generate code that will fail if the constants change value. -func _() { - // An "cannot convert connectionInfoMap literal (type connectionInfoMap) to type sync.Map" compiler error signifies that the base type have changed. - // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(connectionInfoMap{}) -} - -var _nil_connectionInfoMap_connectionInfo_value = func() (val *connectionInfo) { return }() - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *connectionInfoMap) Load(key string) (*connectionInfo, bool) { - value, ok := (*sync.Map)(m).Load(key) - if value == nil { - return _nil_connectionInfoMap_connectionInfo_value, ok - } - return value.(*connectionInfo), ok -} - -// Store sets the value for a key. -func (m *connectionInfoMap) Store(key string, value *connectionInfo) { - (*sync.Map)(m).Store(key, value) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *connectionInfoMap) LoadOrStore(key string, value *connectionInfo) (*connectionInfo, bool) { - actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) - if actual == nil { - return _nil_connectionInfoMap_connectionInfo_value, loaded - } - return actual.(*connectionInfo), loaded -} - -// LoadAndDelete deletes the value for a key, returning the previous value if any. -// The loaded result reports whether the key was present. -func (m *connectionInfoMap) LoadAndDelete(key string) (value *connectionInfo, loaded bool) { - actual, loaded := (*sync.Map)(m).LoadAndDelete(key) - if actual == nil { - return _nil_connectionInfoMap_connectionInfo_value, loaded - } - return actual.(*connectionInfo), loaded -} - -// Delete deletes the value for a key. -func (m *connectionInfoMap) Delete(key string) { - (*sync.Map)(m).Delete(key) -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *connectionInfoMap) Range(f func(key string, value *connectionInfo) bool) { - (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(*connectionInfo)) - }) -} diff --git a/pkg/networkservice/common/heal/context.go b/pkg/networkservice/common/heal/context.go deleted file mode 100644 index b82d51650..000000000 --- a/pkg/networkservice/common/heal/context.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heal - -import ( - "context" - - "github.com/networkservicemesh/api/pkg/api/networkservice" -) - -const ( - requestHealConnectionFuncKey contextKeyType = "requestHealConnectionFuncKey" - requestRestoreConnectionFuncKey contextKeyType = "requestRestoreConnectionFuncKey" -) - -type contextKeyType string - -type requestHealFuncType func(conn *networkservice.Connection) - -func withRequestHealConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context { - if parent == nil { - panic("cannot create context from nil parent") - } - return context.WithValue(parent, requestHealConnectionFuncKey, fun) -} - -func requestHealConnectionFunc(ctx context.Context) requestHealFuncType { - if rv, ok := ctx.Value(requestHealConnectionFuncKey).(requestHealFuncType); ok { - return rv - } - return nil -} - -func withRequestRestoreConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context { - if parent == nil { - panic("cannot create context from nil parent") - } - return context.WithValue(parent, requestRestoreConnectionFuncKey, fun) -} - -func requestRestoreConnectionFunc(ctx context.Context) requestHealFuncType { - if rv, ok := ctx.Value(requestRestoreConnectionFuncKey).(requestHealFuncType); ok { - return rv - } - return nil -} diff --git a/pkg/networkservice/common/heal/context_wrapper_map.gen.go b/pkg/networkservice/common/heal/context_wrapper_map.gen.go deleted file mode 100644 index 6200dd9ef..000000000 --- a/pkg/networkservice/common/heal/context_wrapper_map.gen.go +++ /dev/null @@ -1,73 +0,0 @@ -// Code generated by "-output context_wrapper_map.gen.go -type ctxWrapperMap -output context_wrapper_map.gen.go -type ctxWrapperMap"; DO NOT EDIT. -package heal - -import ( - "sync" // Used by sync.Map. -) - -// Generate code that will fail if the constants change value. -func _() { - // An "cannot convert ctxWrapperMap literal (type ctxWrapperMap) to type sync.Map" compiler error signifies that the base type have changed. - // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(ctxWrapperMap{}) -} - -var _nil_ctxWrapperMap_ctxWrapper_value = func() (val *ctxWrapper) { return }() - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *ctxWrapperMap) Load(key string) (*ctxWrapper, bool) { - value, ok := (*sync.Map)(m).Load(key) - if value == nil { - return _nil_ctxWrapperMap_ctxWrapper_value, ok - } - return value.(*ctxWrapper), ok -} - -// Store sets the value for a key. -func (m *ctxWrapperMap) Store(key string, value *ctxWrapper) { - (*sync.Map)(m).Store(key, value) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *ctxWrapperMap) LoadOrStore(key string, value *ctxWrapper) (*ctxWrapper, bool) { - actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) - if actual == nil { - return _nil_ctxWrapperMap_ctxWrapper_value, loaded - } - return actual.(*ctxWrapper), loaded -} - -// LoadAndDelete deletes the value for a key, returning the previous value if any. -// The loaded result reports whether the key was present. -func (m *ctxWrapperMap) LoadAndDelete(key string) (value *ctxWrapper, loaded bool) { - actual, loaded := (*sync.Map)(m).LoadAndDelete(key) - if actual == nil { - return _nil_ctxWrapperMap_ctxWrapper_value, loaded - } - return actual.(*ctxWrapper), loaded -} - -// Delete deletes the value for a key. -func (m *ctxWrapperMap) Delete(key string) { - (*sync.Map)(m).Delete(key) -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *ctxWrapperMap) Range(f func(key string, value *ctxWrapper) bool) { - (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(*ctxWrapper)) - }) -} diff --git a/pkg/networkservice/common/heal/heal_test.go b/pkg/networkservice/common/heal/heal_test.go deleted file mode 100644 index f71ce137e..000000000 --- a/pkg/networkservice/common/heal/heal_test.go +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heal_test - -import ( - "context" - "net/url" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "google.golang.org/grpc" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" -) - -func startRemoteServer( - ctx context.Context, - t *testing.T, - expireDuration time.Duration, - additionalFunctionality ...networkservice.NetworkServiceServer, -) (*url.URL, context.CancelFunc) { - ctx, cancel := context.WithCancel(ctx) - - server := endpoint.NewServer(ctx, sandbox.GenerateExpiringToken(expireDuration), - endpoint.WithName("remote"), - endpoint.WithAdditionalFunctionality(additionalFunctionality...)) - - grpcServer := grpc.NewServer() - server.Register(grpcServer) - - u := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} - select { - case err := <-grpcutils.ListenAndServe(ctx, u, grpcServer): - require.NoError(t, err) - default: - } - - return u, cancel -} - -func TestHeal_CloseChain(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - remoteURL, remoteCancel := startRemoteServer(ctx, t, 0) - defer remoteCancel() - - counter := new(count.Server) - - serverChain := new(networkservice.NetworkServiceClient) - *serverChain = adapters.NewServerToClient( - next.NewNetworkServiceServer( - updatepath.NewServer("server"), - metadata.NewServer(), - updatetoken.NewServer(sandbox.GenerateTestToken), - heal.NewServer(ctx, - heal.WithOnHeal(serverChain)), - clienturl.NewServer(remoteURL), - connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return next.NewNetworkServiceClient( - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), - networkservice.NewNetworkServiceClient(cc), - ) - }, connect.WithDialOptions(grpc.WithInsecure())), - counter, - ), - ) - - client := next.NewNetworkServiceClient( - updatepath.NewClient("client"), - *serverChain, - ) - - _, err := client.Request(ctx, &networkservice.NetworkServiceRequest{ - Connection: new(networkservice.Connection), - }) - require.NoError(t, err) - - remoteCancel() - - require.Eventually(t, func() bool { - return counter.Closes() == 1 - }, 3*time.Second, 10*time.Millisecond) -} - -func TestHeal_CloseChainOnNoMonitorUpdate(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - counter := new(count.Server) - remoteURL, remoteCancel := startRemoteServer(ctx, t, 10*time.Minute, counter) - defer remoteCancel() - - // Create fake monitor connection - monitorURL, monitorCancel := startRemoteServer(ctx, t, 0) - monitorCC, err := grpc.DialContext(ctx, grpcutils.URLToTarget(monitorURL), grpc.WithBlock(), grpc.WithInsecure()) - require.NoError(t, err) - defer func() { - monitorCancel() - _ = monitorCC.Close() - }() - - serverChain := new(networkservice.NetworkServiceClient) - *serverChain = adapters.NewServerToClient( - next.NewNetworkServiceServer( - updatepath.NewServer("server"), - metadata.NewServer(), - updatetoken.NewServer(sandbox.GenerateTestToken), - heal.NewServer(ctx, - heal.WithOnHeal(serverChain)), - clienturl.NewServer(remoteURL), - connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return next.NewNetworkServiceClient( - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(monitorCC)), - networkservice.NewNetworkServiceClient(cc), - ) - }, connect.WithDialOptions(grpc.WithInsecure())), - ), - ) - - client := next.NewNetworkServiceClient( - updatepath.NewClient("client"), - *serverChain, - ) - - requestCtx, cancelRequest := context.WithTimeout(ctx, time.Second) - defer cancelRequest() - - _, err = client.Request(requestCtx, &networkservice.NetworkServiceRequest{ - Connection: new(networkservice.Connection), - }) - require.Error(t, err) - - require.Equal(t, 1, counter.Requests()) - require.Equal(t, 1, counter.Closes()) -} diff --git a/pkg/networkservice/common/heal/metadata.go b/pkg/networkservice/common/heal/metadata.go deleted file mode 100644 index e5069dd5c..000000000 --- a/pkg/networkservice/common/heal/metadata.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heal - -import ( - "context" - - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" -) - -type keyType struct{} - -func loadOrStore(ctx context.Context, connInfo *connectionInfo) (*connectionInfo, bool) { - v, ok := metadata.Map(ctx, true).LoadOrStore(keyType{}, connInfo) - return v.(*connectionInfo), ok -} - -func load(ctx context.Context) (*connectionInfo, bool) { - v, ok := metadata.Map(ctx, true).Load(keyType{}) - if !ok { - return nil, false - } - return v.(*connectionInfo), true -} diff --git a/pkg/networkservice/common/heal/option.go b/pkg/networkservice/common/heal/option.go deleted file mode 100644 index 2c07f94bf..000000000 --- a/pkg/networkservice/common/heal/option.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heal - -import ( - "time" - - "github.com/networkservicemesh/api/pkg/api/networkservice" -) - -// OnRestore is an action that should be performed on restore request -type OnRestore int - -const ( - // OnRestoreRestore - restore should be performed - OnRestoreRestore OnRestore = iota - // OnRestoreHeal - heal should be performed - OnRestoreHeal - // OnRestoreIgnore - restore request should be ignored - OnRestoreIgnore -) - -// Option is an option pattern for heal server -type Option func(healOpts *healOptions) - -// WithOnHeal - sets client used 'onHeal'. -// * If we detect we need to heal, onHeal.Request is used to heal. -// * If we can't heal connection, onHeal.Close will be called. -// * If onHeal is nil, then we simply set onHeal to this client chain element. -// Since networkservice.NetworkServiceClient is an interface (and thus a pointer) *networkservice.NetworkServiceClient -// is a double pointer. Meaning it points to a place that points to a place that implements -// networkservice.NetworkServiceClient. This is done because when we use heal.NewClient as part of a chain, we may not -// *have* a pointer to this chain. -func WithOnHeal(onHeal *networkservice.NetworkServiceClient) Option { - return func(healOpts *healOptions) { - healOpts.onHeal = onHeal - } -} - -// WithOnRestore sets on restore action. Default `OnRestoreHeal`. -// IMPORTANT: should be set `OnRestoreIgnore` for the Forwarder, because it results in NSMgr doesn't understanding that -// Request is coming from Forwarder (https://github.com/networkservicemesh/sdk/issues/970). -func WithOnRestore(onRestore OnRestore) Option { - return func(healOpts *healOptions) { - healOpts.onRestore = onRestore - } -} - -// WithRestoreTimeout sets restore timeout. Default `1m`. -func WithRestoreTimeout(restoreTimeout time.Duration) Option { - return func(healOpts *healOptions) { - healOpts.restoreTimeout = restoreTimeout - } -} - -// WithEndpointChange sets if Connection.EndpointName can be changed with monitor updates -func WithEndpointChange() Option { - return func(healOpts *healOptions) { - healOpts.changeEndpoint = true - } -} - -type healOptions struct { - onHeal *networkservice.NetworkServiceClient - onRestore OnRestore - restoreTimeout time.Duration - changeEndpoint bool -} diff --git a/pkg/networkservice/common/heal/server.go b/pkg/networkservice/common/heal/server.go deleted file mode 100644 index 7ad860100..000000000 --- a/pkg/networkservice/common/heal/server.go +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright (c) 2020 Cisco Systems, Inc. -// -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package heal provides a chain element that carries out proper nsm healing from client to endpoint -package heal - -import ( - "context" - "sync" - "time" - - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/empty" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" - "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/log" -) - -type ctxWrapper struct { - mut sync.Mutex - request *networkservice.NetworkServiceRequest - requestTimeout time.Duration - ctx context.Context - cancel func() -} - -type healServer struct { - ctx context.Context - onHeal *networkservice.NetworkServiceClient - onRestore OnRestore - restoreTimeout time.Duration - healContextMap ctxWrapperMap -} - -// NewServer - creates a new networkservice.NetworkServiceServer chain element that implements the healing algorithm -func NewServer(ctx context.Context, opts ...Option) networkservice.NetworkServiceServer { - healOpts := &healOptions{ - onRestore: OnRestoreHeal, - restoreTimeout: time.Minute, - } - for _, opt := range opts { - opt(healOpts) - } - - rv := &healServer{ - ctx: ctx, - onHeal: healOpts.onHeal, - onRestore: healOpts.onRestore, - restoreTimeout: healOpts.restoreTimeout, - } - - if rv.onHeal == nil { - rv.onHeal = addressof.NetworkServiceClient(adapters.NewServerToClient(rv)) - } - - return rv -} - -func (f *healServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - clockTime := clock.FromContext(ctx) - ctx = f.withHandlers(ctx) - - requestTimeout := time.Duration(0) - if deadline, ok := ctx.Deadline(); ok { - requestTimeout = clockTime.Until(deadline) - } - - requestStart := clockTime.Now() - - conn, err := next.Server(ctx).Request(ctx, request) - if err != nil { - return nil, err - } - - // There can possible be a case when we are trying to heal from the local case to the remote case. Maximum captured - // difference between these times was 3x on packet cluster (0.5s local vs 1.5s remote). So taking 5x value would be - // enough to cover such local to remote case and not too much in terms of blocking subsequent Request/Close events - // (7.5s for the worst remote case). - requestDuration := clockTime.Since(requestStart) * 5 - if requestDuration > requestTimeout { - requestTimeout = requestDuration - } - - cw, loaded := f.healContextMap.LoadOrStore(request.GetConnection().GetId(), &ctxWrapper{ - request: request.Clone(), - requestTimeout: requestTimeout, - ctx: f.createHealContext(ctx, nil), - }) - if loaded { - cw.mut.Lock() - defer cw.mut.Unlock() - - if cw.cancel != nil { - cw.cancel() - cw.cancel = nil - } - cw.request = request.Clone() - if requestTimeout > cw.requestTimeout { - cw.requestTimeout = requestTimeout - } - cw.ctx = f.createHealContext(ctx, cw.ctx) - } - - return conn, nil -} - -func (f *healServer) withHandlers(ctx context.Context) context.Context { - ctx = withRequestHealConnectionFunc(ctx, f.handleHealConnectionRequest) - - var restoreConnectionHandler requestHealFuncType - switch f.onRestore { - case OnRestoreRestore: - restoreConnectionHandler = f.handleRestoreConnectionRequest - case OnRestoreHeal: - restoreConnectionHandler = f.handleHealConnectionRequest - case OnRestoreIgnore: - restoreConnectionHandler = func(*networkservice.Connection) {} - } - ctx = withRequestRestoreConnectionFunc(ctx, restoreConnectionHandler) - - return ctx -} - -func (f *healServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - rv, err := next.Server(ctx).Close(ctx, conn) - - f.stopHeal(conn) - - return rv, err -} - -func (f *healServer) getHealContext( - conn *networkservice.Connection, -) (context.Context, *networkservice.NetworkServiceRequest, time.Duration) { - cw, ok := f.healContextMap.Load(conn.GetId()) - if !ok { - return nil, nil, 0 - } - - cw.mut.Lock() - defer cw.mut.Unlock() - - if cw.cancel != nil { - cw.cancel() - } - ctx, cancel := context.WithCancel(cw.ctx) - cw.cancel = cancel - - return ctx, cw.request.Clone(), cw.requestTimeout -} - -// handleHealConnectionRequest - heals requested connection. Returns immediately, heal is asynchronous. -func (f *healServer) handleHealConnectionRequest(conn *networkservice.Connection) { - ctx, request, requestTimeout := f.getHealContext(conn) - if request == nil { - return - } - - request.SetRequestConnection(conn.Clone()) - - go f.processHeal(ctx, request, requestTimeout) -} - -// handleRestoreConnectionRequest - recreates connection. Returns immediately, heal is asynchronous. -func (f *healServer) handleRestoreConnectionRequest(conn *networkservice.Connection) { - ctx, request, requestTimeout := f.getHealContext(conn) - if request == nil { - return - } - - request.SetRequestConnection(conn.Clone()) - - go f.restoreConnection(ctx, request, requestTimeout) -} - -func (f *healServer) stopHeal(conn *networkservice.Connection) { - cw, loaded := f.healContextMap.LoadAndDelete(conn.GetId()) - if !loaded { - return - } - - cw.mut.Lock() - defer cw.mut.Unlock() - - if cw.cancel != nil { - cw.cancel() - } -} - -func (f *healServer) restoreConnection( - ctx context.Context, - request *networkservice.NetworkServiceRequest, - requestTimeout time.Duration, -) { - clockTime := clock.FromContext(ctx) - - if ctx.Err() != nil { - return - } - - // Make sure we have a valid expireTime to work with - expires := request.GetConnection().GetNextPathSegment().GetExpires() - expireTime, err := ptypes.Timestamp(expires) - if err != nil { - return - } - - deadline := clockTime.Now().Add(f.restoreTimeout) - if deadline.After(expireTime) { - deadline = expireTime - } - restoreCtx, restoreCancel := clockTime.WithDeadline(ctx, deadline) - defer restoreCancel() - - for restoreCtx.Err() == nil { - requestCtx, requestCancel := clockTime.WithTimeout(restoreCtx, requestTimeout) - _, err := (*f.onHeal).Request(requestCtx, request.Clone()) - requestCancel() - - if err == nil { - return - } - } - - f.processHeal(ctx, request, requestTimeout) -} - -func (f *healServer) processHeal( - ctx context.Context, - request *networkservice.NetworkServiceRequest, - requestTimeout time.Duration, -) { - clockTime := clock.FromContext(ctx) - logger := log.FromContext(ctx).WithField("healServer", "processHeal") - - if ctx.Err() != nil { - return - } - - candidates := discover.Candidates(ctx) - conn := request.GetConnection() - if candidates != nil || conn.GetPath().GetIndex() == 0 { - logger.Infof("Starting heal process for %s", conn.GetId()) - - conn.NetworkServiceEndpointName = "" - conn.Path.PathSegments = conn.Path.PathSegments[0 : conn.Path.Index+1] - - for ctx.Err() == nil { - requestCtx, requestCancel := clockTime.WithTimeout(ctx, requestTimeout) - _, err := (*f.onHeal).Request(requestCtx, request.Clone()) - requestCancel() - - if err != nil { - logger.Errorf("Failed to heal connection %s: %v", conn.GetId(), err) - } else { - logger.Infof("Finished heal process for %s", conn.GetId()) - break - } - } - } else { - // Huge timeout is not required to close connection on a current path segment - closeCtx, closeCancel := clockTime.WithTimeout(ctx, time.Second) - defer closeCancel() - - _, err := (*f.onHeal).Close(closeCtx, conn) - if err != nil { - logger.Errorf("Failed to close connection %s: %v", conn.GetId(), err) - } - } -} - -// createHealContext - create context to be used on heal. -// Uses f.ctx as base and inserts Candidates from requestCtx or cachedCtx into it, if there are any. -func (f *healServer) createHealContext(requestCtx, cachedCtx context.Context) context.Context { - ctx := requestCtx - if cachedCtx != nil { - if candidates := discover.Candidates(ctx); candidates == nil || len(candidates.Endpoints) > 0 { - ctx = cachedCtx - } - } - healCtx := f.ctx - if candidates := discover.Candidates(ctx); candidates != nil { - healCtx = discover.WithCandidates(healCtx, candidates.Endpoints, candidates.NetworkService) - } - return healCtx -} diff --git a/pkg/networkservice/common/heal/server_test.go b/pkg/networkservice/common/heal/server_test.go deleted file mode 100644 index 6dfc30760..000000000 --- a/pkg/networkservice/common/heal/server_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heal_test - -import ( - "context" - "testing" - "time" - - "github.com/golang/protobuf/ptypes/empty" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "google.golang.org/grpc" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" -) - -func TestHealClient_Request(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - onHealCh := make(chan struct{}) - // TODO for tomorrow... check on how to work onHeal into the new chain I've built - var onHeal networkservice.NetworkServiceClient = &testOnHeal{ - requestFunc: func(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { - if err = ctx.Err(); err == nil { - close(onHealCh) - } - return request.Connection, err - }, - } - - eventCh := make(chan *networkservice.ConnectionEvent, 1) - defer close(eventCh) - - monitorServer := eventchannel.NewMonitorServer(eventCh) - - server := chain.NewNetworkServiceServer( - updatepath.NewServer("testServer"), - monitor.NewServer(ctx, &monitorServer), - updatetoken.NewServer(sandbox.GenerateTestToken), - ) - - client := chain.NewNetworkServiceClient( - updatepath.NewClient("testClient"), - metadata.NewClient(), - adapters.NewServerToClient(heal.NewServer(ctx, - heal.WithOnHeal(&onHeal))), - heal.NewClient(ctx, adapters.NewMonitorServerToClient(monitorServer)), - adapters.NewServerToClient( - chain.NewNetworkServiceServer( - updatetoken.NewServer(sandbox.GenerateTestToken), - server, - ), - ), - ) - - conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - NetworkService: "ns-1", - }, - }) - require.NoError(t, err) - - _, err = server.Close(ctx, conn.Clone()) - require.NoError(t, err) - - select { - case <-ctx.Done(): - require.FailNow(t, "timeout waiting for Heal event") - case <-onHealCh: - // All is fine, test is passed - } - - _, err = client.Close(ctx, conn) - require.NoError(t, err) -} - -type testOnHeal struct { - requestFunc func(context.Context, *networkservice.NetworkServiceRequest, ...grpc.CallOption) (*networkservice.Connection, error) -} - -func (t *testOnHeal) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - return t.requestFunc(ctx, request, opts...) -} - -func (t *testOnHeal) Close(context.Context, *networkservice.Connection, ...grpc.CallOption) (*empty.Empty, error) { - return new(empty.Empty), nil -} diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go new file mode 100644 index 000000000..881289629 --- /dev/null +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -0,0 +1,90 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "github.com/networkservicemesh/api/pkg/api/networkservice" +) + +type clientFilter struct { + conn *networkservice.Connection + networkservice.MonitorConnection_MonitorConnectionsClient +} + +func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsClient, conn *networkservice.Connection) networkservice.MonitorConnection_MonitorConnectionsClient { + return &clientFilter{ + MonitorConnection_MonitorConnectionsClient: client, + conn: conn, + } +} + +func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { + for { + eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv() + if err != nil { + return nil, err + } + eventOut := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: make(map[string]*networkservice.Connection), + } + for _, connIn := range eventIn.GetConnections() { + if eventIn.GetType() == networkservice.ConnectionEventType_DELETE { + connIn = connIn.Clone() + connIn.State = networkservice.State_DOWN + } + // If we don't have enough PathSegments connIn doesn't match e.conn + if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) { + continue + } + // If the e.conn isn't in the expected PathSegment connIn doesn't match e.conn + if connIn.GetPath().GetPathSegments()[int(c.conn.GetPath().GetIndex())].GetId() != c.conn.GetId() { + continue + } + // If the current index isn't the index of e.conn or what comes after it connIn doesn't match e.conn + if !(connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex() || connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex()+1) { + continue + } + + // Construct the outgoing Connection + connOut := c.conn.Clone() + connOut.Path = connIn.Path + connOut.GetPath().Index = c.conn.GetPath().GetIndex() + connOut.Context = connIn.Context + connOut.State = connIn.State + + // If it's deleted, mark the event state down + if eventIn.GetType() == networkservice.ConnectionEventType_DELETE { + connOut.State = networkservice.State_DOWN + } + + // If the connection hasn't changed... don't send the event + if connOut.Equals(c.conn) { + continue + } + + // Add the Connection to the outgoing event + eventOut.GetConnections()[connOut.GetId()] = connOut + + // Update the event we are watching for: + c.conn = connOut + } + if len(eventOut.GetConnections()) > 0 { + return eventOut, nil + } + } +} diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go new file mode 100644 index 000000000..c9e0559bc --- /dev/null +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -0,0 +1,101 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +type eventLoop struct { + eventLoopCtx context.Context + conn *networkservice.Connection + eventConsumer eventConsumer + client networkservice.MonitorConnection_MonitorConnectionsClient +} + +func newEventLoop(ctx context.Context, ec eventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) { + conn = conn.Clone() + // Is another chain element asking for events? If not, no need to monitor + if ec == nil { + return func() {}, nil + } + + // Create new eventLoopCtx and store its eventLoopCancel + eventLoopCtx, eventLoopCancel := context.WithCancel(ctx) + + // Create selector to only ask for events related to our Connection + selector := &networkservice.MonitorScopeSelector{ + PathSegments: []*networkservice.PathSegment{ + { + Id: conn.GetCurrentPathSegment().GetId(), + Name: conn.GetCurrentPathSegment().GetName(), + }, + }, + } + + client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector) + if err != nil { + eventLoopCancel() + return nil, errors.WithStack(err) + } + + // get the initial state transfer and use it to detect whether we have a real connection or not + _, err = client.Recv() + if err != nil { + eventLoopCancel() + return nil, errors.WithStack(err) + } + + cev := &eventLoop{ + eventLoopCtx: eventLoopCtx, + conn: conn, + eventConsumer: ec, + client: newClientFilter(client, conn), + } + + // Start the eventLoop + go cev.eventLoop() + return eventLoopCancel, nil +} + +func (cev *eventLoop) eventLoop() { + // So we have a client, and can receive events + for { + eventIn, err := cev.client.Recv() + if cev.eventLoopCtx.Err() != nil { + return + } + if err != nil { + // If we get an error, we've lost our connection... Send Down update + connOut := cev.conn.Clone() + connOut.State = networkservice.State_DOWN + eventOut := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + cev.conn.GetId(): connOut, + }, + } + _ = cev.eventConsumer.Send(eventOut) + return + } + _ = cev.eventConsumer.Send(eventIn) + } +} diff --git a/pkg/networkservice/common/connect/metadata.go b/pkg/networkservice/common/monitor/metadata.go similarity index 50% rename from pkg/networkservice/common/connect/metadata.go rename to pkg/networkservice/common/monitor/metadata.go index 288f8eefb..3b1ee0ed1 100644 --- a/pkg/networkservice/common/connect/metadata.go +++ b/pkg/networkservice/common/monitor/metadata.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package connect +package monitor import ( "context" @@ -22,17 +22,20 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" ) -type keyType struct{} +type key struct{} -func loadOrStore(ctx context.Context, connInfo *connectionInfo) (*connectionInfo, bool) { - v, ok := metadata.Map(ctx, false).LoadOrStore(keyType{}, connInfo) - return v.(*connectionInfo), ok +// store sets the context.CancelFunc stored in per Connection.Id metadata. +func store(ctx context.Context, isClient bool, cancel context.CancelFunc) { + metadata.Map(ctx, isClient).Store(key{}, cancel) } -func load(ctx context.Context) (*connectionInfo, bool) { - v, ok := metadata.Map(ctx, false).Load(keyType{}) +// loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func loadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) if !ok { - return nil, false + return } - return v.(*connectionInfo), true + value, ok = rawValue.(context.CancelFunc) + return value, ok } diff --git a/pkg/networkservice/common/monitor/monitor_connection_server.go b/pkg/networkservice/common/monitor/monitor_connection_server.go new file mode 100644 index 000000000..d86db7b6c --- /dev/null +++ b/pkg/networkservice/common/monitor/monitor_connection_server.go @@ -0,0 +1,112 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "context" + + "github.com/edwarnicke/serialize" + "github.com/google/uuid" + "github.com/networkservicemesh/api/pkg/api/networkservice" +) + +type monitorConnectionServer struct { + chainCtx context.Context + connections map[string]*networkservice.Connection + filters map[string]*monitorFilter + executor serialize.Executor +} + +func newMonitorConnectionServer(chainCtx context.Context) networkservice.MonitorConnectionServer { + return &monitorConnectionServer{ + chainCtx: chainCtx, + connections: make(map[string]*networkservice.Connection), + filters: make(map[string]*monitorFilter), + } +} + +func (m *monitorConnectionServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error { + m.executor.AsyncExec(func() { + filter := newMonitorFilter(selector, srv) + m.filters[uuid.New().String()] = filter + + connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector) + + // Send initial transfer of all data available + filter.executor.AsyncExec(func() { + _ = filter.Send(&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, + Connections: connections, + }) + }) + }) + + select { + case <-srv.Context().Done(): + case <-m.chainCtx.Done(): + } + + return nil +} + +var _ networkservice.MonitorConnectionServer = &monitorConnectionServer{} + +func (m *monitorConnectionServer) Send(event *networkservice.ConnectionEvent) (_ error) { + m.executor.AsyncExec(func() { + if event.Type == networkservice.ConnectionEventType_UPDATE { + for _, conn := range event.GetConnections() { + m.connections[conn.GetId()] = conn.Clone() + } + } + if event.Type == networkservice.ConnectionEventType_DELETE { + for _, conn := range event.GetConnections() { + delete(m.connections, conn.GetId()) + } + } + if event.Type == networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER { + // sending event with INIITIAL_STATE_TRANSFER not permitted + return + } + for id, filter := range m.filters { + id, filter := id, filter + e := event.Clone() + filter.executor.AsyncExec(func() { + var err error + select { + case <-filter.Context().Done(): + m.executor.AsyncExec(func() { + delete(m.filters, id) + }) + default: + err = filter.Send(e) + } + if err != nil { + m.executor.AsyncExec(func() { + delete(m.filters, id) + }) + } + }) + } + }) + return nil +} + +type eventConsumer interface { + Send(event *networkservice.ConnectionEvent) (err error) +} + +var _ eventConsumer = &monitorConnectionServer{} diff --git a/pkg/networkservice/common/monitor/passthrough_test.go b/pkg/networkservice/common/monitor/passthrough_test.go new file mode 100644 index 000000000..a36a1af1f --- /dev/null +++ b/pkg/networkservice/common/monitor/passthrough_test.go @@ -0,0 +1,261 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor_test + +import ( + "context" + "fmt" + "net/url" + "testing" + "time" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/suite" + "go.uber.org/goleak" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" +) + +const ( + passThoughCount = 10 +) + +type MonitorPassThroughSuite struct { + suite.Suite + testCtx context.Context + testCancel context.CancelFunc + + passThroughEndpoints []endpoint.Endpoint + passThroughListenOnURLs []*url.URL + passThroughConnectToURLs []*url.URL + passThroughCtxs []context.Context + passThroughCancels []context.CancelFunc + + endpoint endpoint.Endpoint + endpointURL *url.URL + endpointCtx context.Context + endpointCancel context.CancelFunc + + client networkservice.NetworkServiceClient + + monitorClient networkservice.MonitorConnection_MonitorConnectionsClient + conn *networkservice.Connection +} + +func (m *MonitorPassThroughSuite) SetupTest() { + m.T().Cleanup(func() { goleak.VerifyNone(m.T()) }) + m.testCtx, m.testCancel = context.WithCancel(context.Background()) + + m.StartEndPoint() + m.StartPassThroughEndpoints(m.endpointURL) + m.StartClient(m.passThroughListenOnURLs[0]) + m.StartMonitor(m.passThroughListenOnURLs[0]) + + event, err := m.monitorClient.Recv() + m.Require().NoError(err) + m.Require().NotNil(event) + m.Require().Equal(networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType()) + m.Require().Nil(event.GetConnections()) + + m.conn, err = m.client.Request(m.testCtx, &networkservice.NetworkServiceRequest{}) + m.Assert().NoError(err) + m.Assert().NotNil(m.conn) + m.Assert().NotNil(m.conn.GetPath()) + m.Require().Equal(m.conn.GetPath().GetIndex(), uint32(0)) + m.Require().Len(m.conn.GetPath().GetPathSegments(), len(m.passThroughEndpoints)+2) + + event, err = m.monitorClient.Recv() + m.Require().NoError(err) + m.Require().NotNil(event) + m.Require().Equal(networkservice.ConnectionEventType_UPDATE, event.GetType()) + m.Require().NotNil(event.GetConnections()) + expectedConn := m.conn.Clone() + expectedConn.GetPath().Index++ + expectedConn.Id = expectedConn.GetCurrentPathSegment().GetId() + actualConn := event.GetConnections()[expectedConn.GetId()] + m.Require().Equal(expectedConn, actualConn) +} + +func (m *MonitorPassThroughSuite) TearDownTest() { + m.testCancel() +} + +func (m *MonitorPassThroughSuite) StartPassThroughEndpoints(connectTo *url.URL) { + m.passThroughCtxs = nil + m.passThroughCancels = nil + m.passThroughListenOnURLs = nil + m.passThroughConnectToURLs = nil + m.passThroughEndpoints = nil + for len(m.passThroughCtxs) < passThoughCount { + passThroughCtx, passThroughCancel := context.WithCancel(m.testCtx) + passThroughListenOnURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:"} + name := fmt.Sprintf("passthrough-%d", len(m.passThroughCtxs)) + passThroughConnectToURL := &url.URL{} + passThroughEndpoint := endpoint.NewServer( + passThroughCtx, + sandbox.GenerateExpiringToken(time.Second), + endpoint.WithName(name), + endpoint.WithAuthorizeServer(null.NewServer()), + endpoint.WithAdditionalFunctionality( + clienturl.NewServer(passThroughConnectToURL), + connect.NewServer( + client.NewClient( + passThroughCtx, + client.WithName(name), + client.WithoutRefresh(), + client.WithAuthorizeClient(null.NewClient()), + client.WithDialOptions( + sandbox.DialOptions()..., + ), + ), + ), + ), + ) + m.Require().NoError(startEndpoint(passThroughCtx, passThroughListenOnURL, passThroughEndpoint)) + if len(m.passThroughConnectToURLs)-1 >= 0 { + *(m.passThroughConnectToURLs[len(m.passThroughConnectToURLs)-1]) = *passThroughListenOnURL + } + m.passThroughCtxs = append(m.passThroughCtxs, passThroughCtx) + m.passThroughCancels = append(m.passThroughCancels, passThroughCancel) + m.passThroughListenOnURLs = append(m.passThroughListenOnURLs, passThroughListenOnURL) + m.passThroughConnectToURLs = append(m.passThroughConnectToURLs, passThroughConnectToURL) + m.passThroughEndpoints = append(m.passThroughEndpoints, passThroughEndpoint) + } + if len(m.passThroughConnectToURLs)-1 >= 0 { + *(m.passThroughConnectToURLs[len(m.passThroughConnectToURLs)-1]) = *connectTo + } +} + +func (m *MonitorPassThroughSuite) StartEndPoint() { + m.endpointCtx, m.endpointCancel = context.WithCancel(m.testCtx) + m.endpointURL = &url.URL{Scheme: "tcp", Host: "127.0.0.1:"} + name := "endpoint" + m.endpoint = endpoint.NewServer( + m.testCtx, + sandbox.GenerateExpiringToken(time.Second), + endpoint.WithName(name), + endpoint.WithAuthorizeServer(null.NewServer()), + ) + m.Require().NoError(startEndpoint(m.endpointCtx, m.endpointURL, m.endpoint)) +} + +func (m *MonitorPassThroughSuite) StartClient(connectTo *url.URL) { + m.client = client.NewClient( + m.testCtx, + client.WithClientURL(connectTo), + client.WithAuthorizeClient(null.NewClient()), + client.WithDialOptions( + sandbox.DialOptions()..., + ), + ) +} + +func (m *MonitorPassThroughSuite) StartMonitor(connectTo *url.URL) { + target := grpcutils.URLToTarget(connectTo) + cc, err := grpc.DialContext(m.testCtx, target, grpc.WithBlock(), grpc.WithInsecure()) + m.Require().NoError(err) + m.Require().NotNil(cc) + go func(ctx context.Context, cc *grpc.ClientConn) { + <-ctx.Done() + _ = cc.Close() + }(m.testCtx, cc) + c := networkservice.NewMonitorConnectionClient(cc) + mc, err := c.MonitorConnections(m.testCtx, &networkservice.MonitorScopeSelector{}) + m.Require().NoError(err) + m.Require().NotNil(mc) + m.monitorClient = mc +} + +func (m *MonitorPassThroughSuite) ValidateEvent(expectedType networkservice.ConnectionEventType, expectedConn *networkservice.Connection) { + event, err := m.monitorClient.Recv() + m.Require().NoError(err) + m.Require().NotNil(event) + m.Require().Equal(expectedType, event.GetType()) + m.Require().NotNil(event.GetConnections()) + nextID := expectedConn.GetNextPathSegment().GetId() + actualConn := event.GetConnections()[nextID] + m.Require().Equal(nextID, actualConn.GetId()) + m.Require().Equal(len(expectedConn.GetPath().GetPathSegments()), len(actualConn.GetPath().GetPathSegments())) + for i, actualSegment := range actualConn.GetPath().GetPathSegments() { + expectedSegment := expectedConn.GetPath().GetPathSegments()[i] + m.Assert().Equal(expectedSegment.GetId(), actualSegment.GetId()) + m.Assert().Equal(expectedSegment.GetName(), actualSegment.GetName()) + } + m.Require().Equal(expectedConn.GetContext(), actualConn.GetContext()) + m.Require().Equal(expectedConn.GetPath().GetIndex()+1, actualConn.GetPath().GetIndex()) + m.Require().Equal(expectedConn.GetState(), actualConn.GetState()) +} + +func (m *MonitorPassThroughSuite) TestDeleteToDown() { + // Have the endpoint close the connection + closeConn := m.conn.Clone() + closeConn.GetPath().Index = uint32(len(closeConn.GetPath().GetPathSegments()) - 1) + closeConn.Id = closeConn.GetCurrentPathSegment().GetId() + _, err := m.endpoint.Close(m.testCtx, closeConn) + m.Require().NoError(err) + expectedConn := m.conn.Clone() + expectedConn.State = networkservice.State_DOWN + m.ValidateEvent(networkservice.ConnectionEventType_UPDATE, expectedConn) +} + +func (m *MonitorPassThroughSuite) TestServerDown() { + // Disconnect the Server + m.endpointCancel() + m.Require().NoError(waitServerStopped(m.endpointURL)) + + expectedConn := m.conn.Clone() + expectedConn.State = networkservice.State_DOWN + m.ValidateEvent(networkservice.ConnectionEventType_UPDATE, expectedConn) +} + +func (m *MonitorPassThroughSuite) TestServerUpdate() { + // Have the endpoint close the connection + endpointConn := m.conn.Clone() + endpointConn.GetPath().Index = uint32(len(endpointConn.GetPath().GetPathSegments()) - 1) + endpointConn.Id = endpointConn.GetCurrentPathSegment().GetId() + endpointConn.State = networkservice.State_DOWN + endpointConn.Context = &networkservice.ConnectionContext{ + ExtraContext: map[string]string{"mark": "true"}, + } + _, err := m.endpoint.Request(m.testCtx, &networkservice.NetworkServiceRequest{ + Connection: endpointConn, + }) + m.Require().NoError(err) + + expectedConn := endpointConn.Clone() + expectedConn.GetPath().Index = m.conn.GetPath().GetIndex() + expectedConn.Id = expectedConn.GetCurrentPathSegment().GetId() + m.ValidateEvent(networkservice.ConnectionEventType_UPDATE, expectedConn) +} + +func (m *MonitorPassThroughSuite) TestDeleteOnClientClose() { + // Have the client close the connection + _, err := m.client.Close(m.testCtx, m.conn) + m.Require().NoError(err) + m.ValidateEvent(networkservice.ConnectionEventType_DELETE, m.conn) +} + +func TestPassThrough(t *testing.T) { + suite.Run(t, &MonitorPassThroughSuite{}) +} diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index 16f957067..0946981ce 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -24,21 +24,20 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" - "github.com/google/uuid" + "github.com/pkg/errors" - "github.com/edwarnicke/serialize" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type monitorServer struct { - ctx context.Context - connections map[string]*networkservice.Connection - filters map[string]*monitorFilter - executor serialize.Executor + chainCtx context.Context + networkservice.MonitorConnectionServer } // NewServer - creates a NetworkServiceServer chain element that will properly update a MonitorConnectionServer @@ -49,96 +48,57 @@ type monitorServer struct { // NewServer(...) as any other chain element constructor, but also get back a // networkservice.MonitorConnectionServer that can be used either standalone or in a // networkservice.MonitorConnectionServer chain -// ctx - context for lifecycle management -func NewServer(ctx context.Context, monitorServerPtr *networkservice.MonitorConnectionServer) networkservice.NetworkServiceServer { - rv := &monitorServer{ - ctx: ctx, - connections: make(map[string]*networkservice.Connection), - filters: make(map[string]*monitorFilter), +// chainCtx - context for lifecycle management +func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.MonitorConnectionServer) networkservice.NetworkServiceServer { + *monitorServerPtr = newMonitorConnectionServer(chainCtx) + return &monitorServer{ + chainCtx: chainCtx, + MonitorConnectionServer: *monitorServerPtr, } - - *monitorServerPtr = rv - - return rv } -func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error { - m.executor.AsyncExec(func() { - filter := newMonitorFilter(selector, srv) - m.filters[uuid.New().String()] = filter - - connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector) +func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + closeCtxFunc := postpone.ContextWithValues(ctx) + // Cancel any existing eventLoop + if cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)); loaded { + cancelEventLoop() + } - // Send initial transfer of all data available - filter.executor.AsyncExec(func() { - _ = filter.Send(&networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, - Connections: connections, - }) - }) + conn, err := next.Server(ctx).Request(ctx, request) + if err != nil { + return nil, err + } + _ = m.MonitorConnectionServer.(eventConsumer).Send(&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{conn.GetId(): conn.Clone()}, }) - select { - case <-srv.Context().Done(): - case <-m.ctx.Done(): + // If we have a clientconn ... we must be part of a passthrough server, and have a client to pass + // events through from, so start an eventLoop + cc, ccLoaded := clientconn.Load(ctx) + if ccLoaded { + cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(eventConsumer), cc, conn) + if eventLoopErr != nil { + closeCtx, closeCancel := closeCtxFunc() + defer closeCancel() + _, _ = next.Client(closeCtx).Close(closeCtx, conn) + return nil, errors.Wrap(eventLoopErr, "unable to monitor") + } + store(ctx, metadata.IsClient(m), cancelEventLoop) } - return nil -} - -func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - conn, err := next.Server(ctx).Request(ctx, request) - if err == nil { - eventConn := conn.Clone() - m.executor.AsyncExec(func() { - m.connections[eventConn.GetId()] = eventConn - // Send UPDATE - m.send(ctx, &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_UPDATE, - Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn}, - }) - }) - } - return conn, err + return conn, nil } func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + // Cancel any existing eventLoop + if cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)); loaded { + cancelEventLoop() + } rv, err := next.Server(ctx).Close(ctx, conn) - - // Remove connection object we have and send DELETE - eventConn := conn.Clone() - m.executor.AsyncExec(func() { - delete(m.connections, eventConn.GetId()) - m.send(ctx, &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_DELETE, - Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn}, - }) + _ = m.MonitorConnectionServer.(eventConsumer).Send(&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_DELETE, + Connections: map[string]*networkservice.Connection{conn.GetId(): conn.Clone()}, }) - return rv, err } - -func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) { - logger := log.FromContext(ctx).WithField("monitorServer", "send") - for id, filter := range m.filters { - id, filter := id, filter - e := event.Clone() - filter.executor.AsyncExec(func() { - var err error - select { - case <-filter.Context().Done(): - err = filter.Context().Err() - default: - err = filter.Send(e) - } - if err == nil { - return - } - - logger.Errorf("error sending event: %+v %s", e, err.Error()) - m.executor.AsyncExec(func() { - delete(m.filters, id) - }) - }) - } -} diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index af10f408b..580ac2b46 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -21,12 +21,14 @@ import ( "testing" "time" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/stretchr/testify/require" "go.uber.org/goleak" - "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" ) @@ -41,7 +43,10 @@ func TestMonitorServer(t *testing.T) { // Create monitorServer, monitorClient, and server. var monitorServer networkservice.MonitorConnectionServer - server := monitor.NewServer(ctx, &monitorServer) + server := chain.NewNetworkServiceServer( + metadata.NewServer(), + monitor.NewServer(ctx, &monitorServer), + ) monitorClient := adapters.NewMonitorServerToClient(monitorServer) // Create maps to hold returned connections and receivers @@ -106,7 +111,7 @@ func TestMonitorServer(t *testing.T) { require.NoError(t, err) } - // Get Delete Events and insure we've properly filtered by segmentName + // Get deleteMonitorClientCC Events and insure we've properly filtered by segmentName for _, segmentName := range segmentNames { event, err := receivers[segmentName].Recv() require.NoError(t, err) diff --git a/pkg/networkservice/common/monitor/utils_test.go b/pkg/networkservice/common/monitor/utils_test.go new file mode 100644 index 000000000..772e3d207 --- /dev/null +++ b/pkg/networkservice/common/monitor/utils_test.go @@ -0,0 +1,93 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor_test + +import ( + "context" + "net/url" + "time" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" +) + +func startEndpoint(ctx context.Context, listenOn *url.URL, server endpoint.Endpoint) error { + grpcServer := grpc.NewServer() + server.Register(grpcServer) + + errCh := grpcutils.ListenAndServe(ctx, listenOn, grpcServer) + select { + case err := <-errCh: + return err + default: + } + + return waitNetworkServiceReady(listenOn) +} + +func waitNetworkServiceReady(target *url.URL) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(target), grpc.WithBlock(), grpc.WithInsecure()) + if err != nil { + return err + } + defer func() { + _ = cc.Close() + }() + + healthCheckRequest := &grpc_health_v1.HealthCheckRequest{ + Service: networkservice.ServiceNames(null.NewServer())[0], + } + + client := grpc_health_v1.NewHealthClient(cc) + for ctx.Err() == nil { + response, err := client.Check(ctx, healthCheckRequest) + if err != nil { + return err + } + if response.Status == grpc_health_v1.HealthCheckResponse_SERVING { + return nil + } + } + return ctx.Err() +} + +func waitServerStopped(target *url.URL) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var err error + for err == nil && ctx.Err() == nil { + dialCtx, dialCancel := context.WithTimeout(ctx, 10*time.Millisecond) + + var cc *grpc.ClientConn + if cc, err = grpc.DialContext(dialCtx, grpcutils.URLToTarget(target), grpc.WithBlock(), grpc.WithInsecure()); err == nil { + _ = cc.Close() + } + + dialCancel() + } + return ctx.Err() +} diff --git a/pkg/tools/grpcutils/url.go b/pkg/tools/grpcutils/url.go index 65a3623e8..1a1568a54 100644 --- a/pkg/tools/grpcutils/url.go +++ b/pkg/tools/grpcutils/url.go @@ -25,6 +25,9 @@ import ( // URLToTarget - convert *net.URL to acceptable grpc target value. func URLToTarget(u *url.URL) (target string) { + if u == nil { + return "" + } switch u.Scheme { case unixScheme: return u.String() diff --git a/pkg/tools/sandbox/builder.go b/pkg/tools/sandbox/builder.go index 13efd17b4..d77e97e67 100644 --- a/pkg/tools/sandbox/builder.go +++ b/pkg/tools/sandbox/builder.go @@ -31,7 +31,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr" "github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgrproxy" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/registry/chains/memory" "github.com/networkservicemesh/sdk/pkg/registry/chains/proxydns" registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect" @@ -294,9 +293,7 @@ func (b *Builder) newNSMgrProxy() *NSMgrEntry { b.generateTokenFunc, nsmgrproxy.WithListenOn(entry.URL), nsmgrproxy.WithName(entry.Name), - nsmgrproxy.WithConnectOptions( - connect.WithDialTimeout(DialTimeout), - connect.WithDialOptions(dialOptions...)), + nsmgrproxy.WithDialOptions(dialOptions...), nsmgrproxy.WithRegistryConnectOptions( registryconnect.WithDialOptions(dialOptions...), ), diff --git a/pkg/tools/sandbox/node.go b/pkg/tools/sandbox/node.go index c6af3889a..3bab7f798 100644 --- a/pkg/tools/sandbox/node.go +++ b/pkg/tools/sandbox/node.go @@ -31,11 +31,8 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -66,9 +63,8 @@ func (n *Node) NewNSMgr( options := []nsmgr.Option{ nsmgr.WithName(name), nsmgr.WithAuthorizeServer(authorize.NewServer(authorize.Any())), - nsmgr.WithConnectOptions( - connect.WithDialTimeout(DialTimeout), - connect.WithDialOptions(dialOptions...)), + nsmgr.WithDialOptions(dialOptions...), + nsmgr.WithDialTimeout(DialTimeout), } if n.domain.Registry != nil { @@ -126,18 +122,17 @@ func (n *Node) NewForwarder( append( additionalFunctionality, clienturl.NewServer(CloneURL(n.NSMgr.URL)), - heal.NewServer(ctx, - heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(entry))), - heal.WithOnRestore(heal.OnRestoreIgnore)), - connect.NewServer(ctx, - client.NewClientFactory( + connect.NewServer( + client.NewClient( + ctx, client.WithName(entry.Name), client.WithAdditionalFunctionality( mechanismtranslation.NewClient(), ), + client.WithDialOptions(dialOptions...), + client.WithDialTimeout(DialTimeout), + client.WithoutRefresh(), ), - connect.WithDialTimeout(DialTimeout), - connect.WithDialOptions(dialOptions...), ), )..., ), @@ -216,10 +211,10 @@ func (n *Node) NewClient( ) networkservice.NetworkServiceClient { return client.NewClient( ctx, - CloneURL(n.NSMgr.URL), + client.WithClientURL(CloneURL(n.NSMgr.URL)), client.WithDialOptions(DialOptions(WithTokenGenerator(generatorFunc))...), - client.WithDialTimeout(DialTimeout), client.WithAuthorizeClient(authorize.NewClient(authorize.Any())), client.WithAdditionalFunctionality(additionalFunctionality...), + client.WithDialTimeout(DialTimeout), ) }