Skip to content

Commit

Permalink
fix linter
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Jun 25, 2024
1 parent e3eed82 commit af66f32
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 59 deletions.
124 changes: 66 additions & 58 deletions pkg/networkservice/common/netsvcmonitor/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Cisco Systems, Inc.
// Copyright (c) 2023-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -57,69 +57,23 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net
return resp, err
}

var conn = resp.Clone()

var monitorCtx, cancel = context.WithCancel(m.chainCtx)

storeCancelFunction(ctx, cancel)

var logger = log.FromContext(ctx).WithField("monitorServer", "Find")

var monitorNetworkServiceGoroutine = func() {
for ; monitorCtx.Err() == nil; time.Sleep(time.Millisecond * 100) {
// nolint:govet
var stream, err = m.nsClient.Find(monitorCtx, &registry.NetworkServiceQuery{
Watch: true,
NetworkService: &registry.NetworkService{
Name: conn.GetNetworkService(),
},
})
if err != nil {
logger.Errorf("an error happened during finding network service: %v", err.Error())
continue
}

var networkServiceCh = registry.ReadNetworkServiceChannel(stream)
var netsvcStreamIsAlive = true

for netsvcStreamIsAlive && monitorCtx.Err() == nil {
select {
case <-monitorCtx.Done():
return
case netsvc, ok := <-networkServiceCh:
if !ok {
netsvcStreamIsAlive = false
break
}

nseStream, err := m.nseClient.Find(monitorCtx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: conn.GetNetworkServiceEndpointName(),
},
})
if err != nil {
logger.Errorf("an error happened during finding nse: %v", err.Error())
break
}

var nses = registry.ReadNetworkServiceEndpointList(nseStream)

if len(nses) == 0 {
continue
}

if len(matchutils.MatchEndpoint(resp.GetLabels(), netsvc.GetNetworkService(), nses...)) == 0 {
begin.FromContext(ctx).Close()
logger.Warnf("nse %v doesn't match with networkservice: %v", conn.GetNetworkServiceEndpointName(), conn.GetNetworkService())

return
}
}
if len(resp.GetPath().GetPathSegments()) > 0 {
var minT = resp.GetPath().GetPathSegments()[0].Expires.AsTime().Local()
for _, seg := range resp.GetPath().GetPathSegments() {
var t = seg.Expires.AsTime().Local()
if minT.Before(t) {
minT = t
}
}
cancel()
monitorCtx, cancel = context.WithTimeout(m.chainCtx, time.Until(minT))
}

go monitorNetworkServiceGoroutine()
storeCancelFunction(ctx, cancel)

go m.monitorNetworkService(monitorCtx, resp.Clone())

return resp, err
}
Expand All @@ -131,3 +85,57 @@ func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connecti

return next.Server(ctx).Close(ctx, conn)
}

func (m *monitorServer) monitorNetworkService(monitorCtx context.Context, resp *networkservice.Connection) {
var logger = log.FromContext(monitorCtx).WithField("monitorServer", "Find")
for ; monitorCtx.Err() == nil; time.Sleep(time.Millisecond * 100) {
// nolint:govet
var stream, err = m.nsClient.Find(monitorCtx, &registry.NetworkServiceQuery{
Watch: true,
NetworkService: &registry.NetworkService{
Name: resp.GetNetworkService(),
},
})
if err != nil {
logger.Errorf("an error happened during finding network service: %v", err.Error())
continue
}

var networkServiceCh = registry.ReadNetworkServiceChannel(stream)
var netsvcStreamIsAlive = true

for netsvcStreamIsAlive && monitorCtx.Err() == nil {
select {
case <-monitorCtx.Done():
return
case netsvc, ok := <-networkServiceCh:
if !ok {
netsvcStreamIsAlive = false
break
}

nseStream, err := m.nseClient.Find(monitorCtx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: resp.GetNetworkServiceEndpointName(),
},
})
if err != nil {
logger.Errorf("an error happened during finding nse: %v", err.Error())
break
}

var nses = registry.ReadNetworkServiceEndpointList(nseStream)

if len(nses) == 0 {
continue
}

if len(matchutils.MatchEndpoint(resp.GetLabels(), netsvc.GetNetworkService(), nses...)) == 0 {
begin.FromContext(monitorCtx).Close()
logger.Warnf("nse %v doesn't match with networkservice: %v", resp.GetNetworkServiceEndpointName(), resp.GetNetworkService())
return
}
}
}
}
}
79 changes: 78 additions & 1 deletion pkg/networkservice/common/netsvcmonitor/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Cisco Systems, Inc.
// Copyright (c) 2023-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -25,6 +25,8 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/netsvcmonitor"
Expand All @@ -36,6 +38,10 @@ import (
)

func Test_Netsvcmonitor_And_GroupOfSimilarNetworkServices(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})

var testCtx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down Expand Up @@ -91,3 +97,74 @@ func Test_Netsvcmonitor_And_GroupOfSimilarNetworkServices(t *testing.T) {
return counter.Closes() > 0
}, time.Millisecond*300, time.Millisecond*50)
}

func Test_NetsvcMonitor_ShouldNotLeakWithoutClose(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})

var testCtx, cancel = context.WithCancel(context.Background())
defer cancel()

var nsServer = memory.NewNetworkServiceRegistryServer()
var nseServer = memory.NewNetworkServiceEndpointRegistryServer()
var counter count.Server

_, _ = nsServer.Register(context.Background(), &registry.NetworkService{
Name: "service-1",
})

_, _ = nseServer.Register(context.Background(), &registry.NetworkServiceEndpoint{
Name: "endpoint-1",
NetworkServiceNames: []string{"service-1"},
})

var server = chain.NewNetworkServiceServer(
metadata.NewServer(),
begin.NewServer(),
netsvcmonitor.NewServer(
testCtx,
adapters.NetworkServiceServerToClient(nsServer),
adapters.NetworkServiceEndpointServerToClient(nseServer),
),
&counter,
)

var n = time.Now().Add(time.Second)

var request = &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: "1",
NetworkService: "service-1",
NetworkServiceEndpointName: "endpoint-1",
Path: &networkservice.Path{
PathSegments: []*networkservice.PathSegment{
{
Expires: timestamppb.New(n),
},
},
},
},
}

var _, err = server.Request(testCtx, request)
require.NoError(t, err)

for end := time.Now().Add(time.Second); time.Now().Before(end); time.Sleep(time.Millisecond * 100) {
if goleak.Find() != nil {
break
}
}
if goleak.Find() == nil {
require.FailNow(t, "netsvc goroutine must be created")
}

for end := time.Now().Add(time.Second * 2); time.Now().Before(end); time.Sleep(time.Millisecond * 100) {
if goleak.Find() == nil {
break
}
}
if e := goleak.Find(); e != nil {
require.FailNow(t, "netsvc goroutine must be stopped, but it's found"+e.Error())
}
}

0 comments on commit af66f32

Please sign in to comment.