From af66f3280b09e17cdd1343076fb5da088c5f1d01 Mon Sep 17 00:00:00 2001 From: denis-tingaikin Date: Tue, 25 Jun 2024 10:24:28 +0300 Subject: [PATCH] fix linter Signed-off-by: denis-tingaikin --- .../common/netsvcmonitor/server.go | 124 ++++++++++-------- .../common/netsvcmonitor/server_test.go | 79 ++++++++++- 2 files changed, 144 insertions(+), 59 deletions(-) diff --git a/pkg/networkservice/common/netsvcmonitor/server.go b/pkg/networkservice/common/netsvcmonitor/server.go index 5c75f92a9..717112d60 100644 --- a/pkg/networkservice/common/netsvcmonitor/server.go +++ b/pkg/networkservice/common/netsvcmonitor/server.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Cisco Systems, Inc. +// Copyright (c) 2023-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -57,69 +57,23 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net return resp, err } - var conn = resp.Clone() - var monitorCtx, cancel = context.WithCancel(m.chainCtx) - storeCancelFunction(ctx, cancel) - - var logger = log.FromContext(ctx).WithField("monitorServer", "Find") - - var monitorNetworkServiceGoroutine = func() { - for ; monitorCtx.Err() == nil; time.Sleep(time.Millisecond * 100) { - // nolint:govet - var stream, err = m.nsClient.Find(monitorCtx, ®istry.NetworkServiceQuery{ - Watch: true, - NetworkService: ®istry.NetworkService{ - Name: conn.GetNetworkService(), - }, - }) - if err != nil { - logger.Errorf("an error happened during finding network service: %v", err.Error()) - continue - } - - var networkServiceCh = registry.ReadNetworkServiceChannel(stream) - var netsvcStreamIsAlive = true - - for netsvcStreamIsAlive && monitorCtx.Err() == nil { - select { - case <-monitorCtx.Done(): - return - case netsvc, ok := <-networkServiceCh: - if !ok { - netsvcStreamIsAlive = false - break - } - - nseStream, err := m.nseClient.Find(monitorCtx, ®istry.NetworkServiceEndpointQuery{ - NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - Name: conn.GetNetworkServiceEndpointName(), - }, - }) - if err != nil { - logger.Errorf("an error happened during finding nse: %v", err.Error()) - break - } - - var nses = registry.ReadNetworkServiceEndpointList(nseStream) - - if len(nses) == 0 { - continue - } - - if len(matchutils.MatchEndpoint(resp.GetLabels(), netsvc.GetNetworkService(), nses...)) == 0 { - begin.FromContext(ctx).Close() - logger.Warnf("nse %v doesn't match with networkservice: %v", conn.GetNetworkServiceEndpointName(), conn.GetNetworkService()) - - return - } - } + if len(resp.GetPath().GetPathSegments()) > 0 { + var minT = resp.GetPath().GetPathSegments()[0].Expires.AsTime().Local() + for _, seg := range resp.GetPath().GetPathSegments() { + var t = seg.Expires.AsTime().Local() + if minT.Before(t) { + minT = t } } + cancel() + monitorCtx, cancel = context.WithTimeout(m.chainCtx, time.Until(minT)) } - go monitorNetworkServiceGoroutine() + storeCancelFunction(ctx, cancel) + + go m.monitorNetworkService(monitorCtx, resp.Clone()) return resp, err } @@ -131,3 +85,57 @@ func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connecti return next.Server(ctx).Close(ctx, conn) } + +func (m *monitorServer) monitorNetworkService(monitorCtx context.Context, resp *networkservice.Connection) { + var logger = log.FromContext(monitorCtx).WithField("monitorServer", "Find") + for ; monitorCtx.Err() == nil; time.Sleep(time.Millisecond * 100) { + // nolint:govet + var stream, err = m.nsClient.Find(monitorCtx, ®istry.NetworkServiceQuery{ + Watch: true, + NetworkService: ®istry.NetworkService{ + Name: resp.GetNetworkService(), + }, + }) + if err != nil { + logger.Errorf("an error happened during finding network service: %v", err.Error()) + continue + } + + var networkServiceCh = registry.ReadNetworkServiceChannel(stream) + var netsvcStreamIsAlive = true + + for netsvcStreamIsAlive && monitorCtx.Err() == nil { + select { + case <-monitorCtx.Done(): + return + case netsvc, ok := <-networkServiceCh: + if !ok { + netsvcStreamIsAlive = false + break + } + + nseStream, err := m.nseClient.Find(monitorCtx, ®istry.NetworkServiceEndpointQuery{ + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + Name: resp.GetNetworkServiceEndpointName(), + }, + }) + if err != nil { + logger.Errorf("an error happened during finding nse: %v", err.Error()) + break + } + + var nses = registry.ReadNetworkServiceEndpointList(nseStream) + + if len(nses) == 0 { + continue + } + + if len(matchutils.MatchEndpoint(resp.GetLabels(), netsvc.GetNetworkService(), nses...)) == 0 { + begin.FromContext(monitorCtx).Close() + logger.Warnf("nse %v doesn't match with networkservice: %v", resp.GetNetworkServiceEndpointName(), resp.GetNetworkService()) + return + } + } + } + } +} diff --git a/pkg/networkservice/common/netsvcmonitor/server_test.go b/pkg/networkservice/common/netsvcmonitor/server_test.go index 7847201dd..6e8e7a0db 100644 --- a/pkg/networkservice/common/netsvcmonitor/server_test.go +++ b/pkg/networkservice/common/netsvcmonitor/server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Cisco Systems, Inc. +// Copyright (c) 2023-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -25,6 +25,8 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/registry" "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/common/netsvcmonitor" @@ -36,6 +38,10 @@ import ( ) func Test_Netsvcmonitor_And_GroupOfSimilarNetworkServices(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + var testCtx, cancel = context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -91,3 +97,74 @@ func Test_Netsvcmonitor_And_GroupOfSimilarNetworkServices(t *testing.T) { return counter.Closes() > 0 }, time.Millisecond*300, time.Millisecond*50) } + +func Test_NetsvcMonitor_ShouldNotLeakWithoutClose(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + var testCtx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var nsServer = memory.NewNetworkServiceRegistryServer() + var nseServer = memory.NewNetworkServiceEndpointRegistryServer() + var counter count.Server + + _, _ = nsServer.Register(context.Background(), ®istry.NetworkService{ + Name: "service-1", + }) + + _, _ = nseServer.Register(context.Background(), ®istry.NetworkServiceEndpoint{ + Name: "endpoint-1", + NetworkServiceNames: []string{"service-1"}, + }) + + var server = chain.NewNetworkServiceServer( + metadata.NewServer(), + begin.NewServer(), + netsvcmonitor.NewServer( + testCtx, + adapters.NetworkServiceServerToClient(nsServer), + adapters.NetworkServiceEndpointServerToClient(nseServer), + ), + &counter, + ) + + var n = time.Now().Add(time.Second) + + var request = &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "service-1", + NetworkServiceEndpointName: "endpoint-1", + Path: &networkservice.Path{ + PathSegments: []*networkservice.PathSegment{ + { + Expires: timestamppb.New(n), + }, + }, + }, + }, + } + + var _, err = server.Request(testCtx, request) + require.NoError(t, err) + + for end := time.Now().Add(time.Second); time.Now().Before(end); time.Sleep(time.Millisecond * 100) { + if goleak.Find() != nil { + break + } + } + if goleak.Find() == nil { + require.FailNow(t, "netsvc goroutine must be created") + } + + for end := time.Now().Add(time.Second * 2); time.Now().Before(end); time.Sleep(time.Millisecond * 100) { + if goleak.Find() == nil { + break + } + } + if e := goleak.Find(); e != nil { + require.FailNow(t, "netsvc goroutine must be stopped, but it's found"+e.Error()) + } +}