diff --git a/go.mod b/go.mod index 5a6431ba5..d0c9738ea 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/uber/jaeger-client-go v2.21.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.uber.org/atomic v1.5.1 + go.uber.org/atomic v1.5.1 // indirect golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // indirect gonum.org/v1/gonum v0.6.2 google.golang.org/grpc v1.27.0 diff --git a/pkg/networkservice/common/heal/client_test.go b/pkg/networkservice/common/heal/client_test.go index 2185cba21..0e11d828b 100644 --- a/pkg/networkservice/common/heal/client_test.go +++ b/pkg/networkservice/common/heal/client_test.go @@ -22,7 +22,13 @@ import ( "testing" "time" - . "github.com/networkservicemesh/sdk/pkg/test/fixtures/healclientfixture" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" + "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/stretchr/testify/require" @@ -33,21 +39,43 @@ const ( tickTimeout = 10 * time.Millisecond ) +type testOnHeal struct { + RequestFunc func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) + CloseFunc func(ctx context.Context, in *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) +} + +func (t *testOnHeal) Request(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + return t.RequestFunc(ctx, in, opts...) +} + +func (t *testOnHeal) Close(ctx context.Context, in *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + return t.CloseFunc(ctx, in, opts...) +} + func TestHealClient_Request(t *testing.T) { - f := &Fixture{ - Request: &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "conn-1", - NetworkService: "ns-1", - }, + eventCh := make(chan *networkservice.ConnectionEvent, 1) + + onHealCh := make(chan struct{}) + onHeal := &testOnHeal{ + RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { + close(onHealCh) + return &networkservice.Connection{}, nil }, } - err := SetupWithSingleRequest(f) - defer TearDown(f) + + client := chain.NewNetworkServiceClient( + heal.NewClient(context.Background(), + eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal))) + + _, err := client.Request(context.Background(), &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "conn-1", + NetworkService: "ns-1", + }, + }) require.Nil(t, err) - require.True(t, reflect.DeepEqual(f.Conn, f.Request.GetConnection())) - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, Connections: map[string]*networkservice.Connection{ "conn-1": { @@ -55,10 +83,9 @@ func TestHealClient_Request(t *testing.T) { NetworkService: "ns-1", }, }, - }) - require.Nil(t, err) + } - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_DELETE, Connections: map[string]*networkservice.Connection{ "conn-1": { @@ -66,12 +93,11 @@ func TestHealClient_Request(t *testing.T) { NetworkService: "ns-1", }, }, - }) - require.Nil(t, err) + } cond := func() bool { select { - case <-f.OnHealNotifierCh: + case <-onHealCh: return true default: return false @@ -81,20 +107,29 @@ func TestHealClient_Request(t *testing.T) { } func TestHealClient_MonitorClose(t *testing.T) { - f := &Fixture{ - Request: &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "conn-1", - NetworkService: "ns-1", - }, + eventCh := make(chan *networkservice.ConnectionEvent, 1) + + onHealCh := make(chan struct{}) + onHeal := &testOnHeal{ + RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { + close(onHealCh) + return &networkservice.Connection{}, nil }, } - err := SetupWithSingleRequest(f) - defer TearDown(f) + + ctx, cancelFunc := context.WithCancel(context.Background()) + client := chain.NewNetworkServiceClient( + heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal))) + + _, err := client.Request(context.Background(), &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "conn-1", + NetworkService: "ns-1", + }, + }) require.Nil(t, err) - require.True(t, reflect.DeepEqual(f.Conn, f.Request.GetConnection())) - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, Connections: map[string]*networkservice.Connection{ "conn-1": { @@ -102,14 +137,13 @@ func TestHealClient_MonitorClose(t *testing.T) { NetworkService: "ns-1", }, }, - }) - require.Nil(t, err) + } - f.CloseStream() + cancelFunc() cond := func() bool { select { - case <-f.OnHealNotifierCh: + case <-onHealCh: return true default: return false @@ -119,26 +153,25 @@ func TestHealClient_MonitorClose(t *testing.T) { } func TestHealClient_EmptyInit(t *testing.T) { - f := &Fixture{ - Request: &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "conn-1", - NetworkService: "ns-1", - }, + eventCh := make(chan *networkservice.ConnectionEvent, 1) + + client := chain.NewNetworkServiceClient( + heal.NewClient(context.Background(), eventchannel.NewMonitorConnectionClient(eventCh), nil)) + + _, err := client.Request(context.Background(), &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "conn-1", + NetworkService: "ns-1", }, - } - err := SetupWithSingleRequest(f) - defer TearDown(f) + }) require.Nil(t, err) - require.True(t, reflect.DeepEqual(f.Conn, f.Request.GetConnection())) - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, Connections: make(map[string]*networkservice.Connection), - }) - require.Nil(t, err) + } - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_UPDATE, Connections: map[string]*networkservice.Connection{ "conn-1": { @@ -146,76 +179,52 @@ func TestHealClient_EmptyInit(t *testing.T) { NetworkService: "ns-1", }, }, - }) - require.Nil(t, err) + } } -func TestHealClient_SeveralConnection(t *testing.T) { - f := &Fixture{ - Request: &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "conn-1", - NetworkService: "ns-1", - }, - }, - } - err := SetupWithSingleRequest(f) - defer TearDown(f) - require.Nil(t, err) - require.True(t, reflect.DeepEqual(f.Conn, f.Request.GetConnection())) +func TestNewClient_MissingConnectionsInInit(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 1) - r := &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "conn-2", - NetworkService: "ns-2", + requestCh := make(chan *networkservice.NetworkServiceRequest) + onHeal := &testOnHeal{ + RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { + requestCh <- in + return &networkservice.Connection{}, nil }, } - conn, err := f.Client.Request(context.Background(), r) - require.Nil(t, err) - require.True(t, reflect.DeepEqual(conn, r.GetConnection())) - - ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond) - _, _, err = f.MockMonitorServer.Stream(ctx) - require.NotNil(t, err) // we expect healClient called MonitorConnections method once - cancelFunc() -} - -func TestNewClient_MissingConnectionsInInit(t *testing.T) { - f := &Fixture{} - err := Setup(f) - defer TearDown(f) - require.Nil(t, err) + ctx, cancelFunc := context.WithCancel(context.Background()) + client := chain.NewNetworkServiceClient( + heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal))) conns := []*networkservice.Connection{ {Id: "conn-1", NetworkService: "ns-1"}, {Id: "conn-2", NetworkService: "ns-2"}, } - conn, err := f.Client.Request(context.Background(), &networkservice.NetworkServiceRequest{Connection: conns[0]}) + conn, err := client.Request(context.Background(), &networkservice.NetworkServiceRequest{Connection: conns[0]}) require.Nil(t, err) require.True(t, reflect.DeepEqual(conn, conns[0])) - conn, err = f.Client.Request(context.Background(), &networkservice.NetworkServiceRequest{Connection: conns[1]}) + conn, err = client.Request(context.Background(), &networkservice.NetworkServiceRequest{Connection: conns[1]}) require.Nil(t, err) require.True(t, reflect.DeepEqual(conn, conns[1])) - err = f.ServerStream.Send(&networkservice.ConnectionEvent{ + eventCh <- &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, Connections: map[string]*networkservice.Connection{conns[0].GetId(): conns[0]}, - }) - require.Nil(t, err) + } // we emulate situation that server managed to handle only the first connection // second connection should came in the UPDATE event, but we emulate server's falling down - f.CloseStream() + cancelFunc() // at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2' healedIDs := map[string]bool{} cond := func() bool { select { - case r := <-f.OnHealNotifierCh: + case r := <-requestCh: if _, ok := healedIDs[r.GetConnection().GetId()]; !ok { healedIDs[r.GetConnection().GetId()] = true return true diff --git a/pkg/test/fixtures/healclientfixture/fixture.go b/pkg/test/fixtures/healclientfixture/fixture.go deleted file mode 100644 index 011bd6237..000000000 --- a/pkg/test/fixtures/healclientfixture/fixture.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package healclientfixture contains auxiliary classes for 'healClient' testing -package healclientfixture - -import ( - "context" - - "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" - "google.golang.org/grpc" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" - "github.com/networkservicemesh/sdk/pkg/test/mocks/mockmonitorconnection" - "github.com/networkservicemesh/sdk/pkg/tools/addressof" -) - -// ClientRequestFunc is a signature of networkservice.Request method -type ClientRequestFunc func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) - -// TestOnHeal test wrapper for 'networkservice.NetworkServiceClient' implementation -type TestOnHeal struct { - r ClientRequestFunc -} - -// Request calls 'r' with passed arguments -func (t *TestOnHeal) Request(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - return t.r(ctx, in, opts...) -} - -// Close has no implementation yet -func (t *TestOnHeal) Close(ctx context.Context, in *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - panic("implement me") -} - -// callNotifier sends received networkservice.NetworkServiceRequest to notifier channel when h called -func callNotifier(notifier chan *networkservice.NetworkServiceRequest, h ClientRequestFunc) ClientRequestFunc { - return func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (i *networkservice.Connection, e error) { - notifier <- in - if h != nil { - return h(ctx, in, opts...) - } - return nil, nil - } -} - -// Fixture is auxiliary class to test 'healClient' -type Fixture struct { - // Client that we are going to test - Client networkservice.NetworkServiceClient - - // ServerStream is server stream used to send 'networkservice.ConnectionEvent' to client - ServerStream networkservice.MonitorConnection_MonitorConnectionsServer - - // CloseStream is function that stops server streaming - CloseStream func() - - // OnHeal pointer to test chain that will be used in case of healing - OnHeal *TestOnHeal - - // OnHealNotifierCh receives *networkservice.NetworkServiceRequest every time when 'OnHeal' called - OnHealNotifierCh chan *networkservice.NetworkServiceRequest - - // MockMonitorServer fake implementation of MonitorConnectionServer, passed to constructor of 'healClient' - MockMonitorServer *mockmonitorconnection.MockMonitorServer - - // Request that passed to Request during setup - Request *networkservice.NetworkServiceRequest - - // Conn is connection received as a response of Request - Conn *networkservice.Connection - - // CancelFunc per-chain cancel function - CancelFunc func() -} - -// Setup make some basic Fixture initialization -func Setup(f *Fixture) error { - f.MockMonitorServer = mockmonitorconnection.New() - - monitorClient, err := f.MockMonitorServer.Client(context.Background()) - if err != nil { - return err - } - - f.OnHeal = &TestOnHeal{} - f.OnHealNotifierCh = make(chan *networkservice.NetworkServiceRequest) - f.OnHeal.r = callNotifier(f.OnHealNotifierCh, f.OnHeal.r) - - ctx, cancelFunc := context.WithCancel(context.Background()) - f.CancelFunc = cancelFunc - f.Client = chain.NewNetworkServiceClient( - heal.NewClient(ctx, monitorClient, addressof.NetworkServiceClient(f.OnHeal))) - - f.ServerStream, f.CloseStream, err = f.MockMonitorServer.Stream(context.Background()) - if err != nil { - return err - } - - return nil -} - -// TearDown releases resources allocated during SetupWithSingleRequest -func TearDown(f *Fixture) { - f.CloseStream() - f.CancelFunc() - f.MockMonitorServer.Close() -} - -// SetupWithSingleRequest initialize Fixture and calls Request with 'request' passed -func SetupWithSingleRequest(f *Fixture) error { - err := Setup(f) - if err != nil { - return err - } - - f.Conn, err = f.Client.Request(context.Background(), f.Request) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/test/mocks/mockmonitorconnection/monitor.go b/pkg/test/mocks/mockmonitorconnection/monitor.go deleted file mode 100644 index 9e2c39681..000000000 --- a/pkg/test/mocks/mockmonitorconnection/monitor.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package mockmonitorconnection contains implementation of fake monitor server for tests -package mockmonitorconnection - -import ( - "context" - "net" - "sync" - - "go.uber.org/atomic" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" -) - -// MockMonitorServer implements networkservice.MonitorConnectionServer for test -type MockMonitorServer struct { - stopCh chan struct{} - streamCh chan networkservice.MonitorConnection_MonitorConnectionsServer - - ln *bufconn.Listener - closeFuncs []func() -} - -// New creates instance of MockMonitorServer -func New() *MockMonitorServer { - rv := &MockMonitorServer{ - stopCh: make(chan struct{}), - streamCh: make(chan networkservice.MonitorConnection_MonitorConnectionsServer), - } - rv.serve() - return rv -} - -// MonitorConnections is a fake implementation of MonitorConnections, pushes 'stream' to channel -// that lately could be obtained using method Stream -func (f *MockMonitorServer) MonitorConnections(s *networkservice.MonitorScopeSelector, stream networkservice.MonitorConnection_MonitorConnectionsServer) error { - f.streamCh <- stream - <-f.stopCh - return nil -} - -// Stream returns the last server 'stream' with blocking -func (f *MockMonitorServer) Stream(ctx context.Context) (networkservice.MonitorConnection_MonitorConnectionsServer, func(), error) { - closed := atomic.NewBool(false) - closeFunc := func() { - if closed.CAS(false, true) { - close(f.stopCh) - } - } - - select { - case s := <-f.streamCh: - return s, closeFunc, nil - case <-ctx.Done(): - return nil, nil, ctx.Err() - } -} - -// Client returns client for fake monitor server -func (f *MockMonitorServer) Client(ctx context.Context) (networkservice.MonitorConnectionClient, error) { - dialer := func(context context.Context, s string) (conn net.Conn, e error) { - return f.ln.Dial() - } - - cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithContextDialer(dialer)) - if err != nil { - return nil, err - } - - f.pushOnCloseFunc(func() { _ = cc.Close() }) - - return networkservice.NewMonitorConnectionClient(cc), nil -} - -// Close releases all resources -func (f *MockMonitorServer) Close() { - for _, c := range f.closeFuncs { - c() - } -} - -func (f *MockMonitorServer) serve() { - srv := grpc.NewServer() - networkservice.RegisterMonitorConnectionServer(srv, f) - f.ln = bufconn.Listen(1024 * 1024) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _ = srv.Serve(f.ln) - }() - - f.pushOnCloseFunc(func() { - _ = f.ln.Close() - wg.Wait() - }) -} - -func (f *MockMonitorServer) pushOnCloseFunc(h func()) { - f.closeFuncs = append([]func(){h}, f.closeFuncs...) -}