Skip to content

Commit

Permalink
Fix healing after failed refresh (#1465)
Browse files Browse the repository at this point in the history
* fix dialer

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* add tests for healing after refresh

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* refactor heal context for event loop

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* add heal started flag to heal event lopp

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* start monitoring after refresh error

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* remove test TestNSMGR_RefreshFailed_DataPlaneHealthy

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix coyright

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix linter

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix linter

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* bump ci

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix discover forwarder tests

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* improve heal monitor cleanup

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* remove withrefresh option, use clock in context

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* move monitorCtrlPlane function

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* use explicit returns in waitForEvents

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix typo

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* different fix for select forwarder tests

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* use better variable names in heal monitor

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

---------

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>
  • Loading branch information
d-uzlov authored Jun 20, 2023
1 parent 63043b2 commit a8c394e
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 65 deletions.
83 changes: 83 additions & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,6 +26,7 @@ import (

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -34,8 +37,12 @@ import (
nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -731,3 +738,79 @@ func testForwarderShouldBeSelectedCorrectlyOnNSMgrRestart(t *testing.T, nodeNum,
}, sandbox.GenerateTestToken)
}
}

func TestNSMGR_RefreshFailed_DataPlaneBroken(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService(t.Name())
nsReg, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter1 := new(count.Server)
// allow only one successful request
inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(1, -1))
isDataplaneHealthy := atomic.Bool{}
isDataplaneHealthy.Store(true)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter1, inject)

request := defaultRequest(nsReg.Name)

tokenDuration := time.Minute * 15
clk := clockmock.New(ctx)
clk.Set(time.Now())

nsc := domain.Nodes[0].NewClient(ctx,
sandbox.GenerateExpiringToken(tokenDuration),
nsclient.WithHealClient(heal.NewClient(ctx,
heal.WithLivenessCheck(func(ctx context.Context, conn *networkservice.Connection) bool {
return isDataplaneHealthy.Load()
}),
heal.WithLivenessCheckInterval(time.Millisecond*10),
)),
)

requestCtx, requestCalcel := context.WithTimeout(ctx, time.Second)
requestCtx = clock.WithClock(requestCtx, clk)
defer requestCalcel()
conn, err := nsc.Request(requestCtx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter1.Requests())

nseReg2 := defaultRegistryEndpoint(nsReg.Name)
nseReg2.Name += "-2"

counter2 := new(count.Server)
// automatically restore data plane flag to prevent repeated heal
dataPlaneNotifier := checkresponse.NewServer(t, func(t *testing.T, c *networkservice.Connection) {
if c != nil {
isDataplaneHealthy.Store(true)
}
})
domain.Nodes[0].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, dataPlaneNotifier, counter2)

// refresh interval in this test is expected to be 3 minutes and a few milliseconds
clk.Add(time.Second * 190)

// wait till refresh reached NSE, to make sure that initial heal monitor is canceled
require.Eventually(t, checkSecondRequestsReceived(counter1.Requests), timeout, tick)

isDataplaneHealthy.Store(false)

require.Eventually(t, func() bool { return counter2.Requests() >= 1 }, timeout, tick)
require.Equal(t, 1, counter2.UniqueRequests())
require.Equal(t, 1, counter2.Requests())

_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
}
13 changes: 11 additions & 2 deletions pkg/networkservice/chains/nsmgr/select_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,17 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) {

domain.Nodes[0].Forwarders[selectedFwd].Cancel()

require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 2, counter.Requests())
require.Equal(t, 0, counter.Closes())

// check different forwarder selected
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 2, counter.Requests())
require.Equal(t, 3, counter.Requests())
require.Equal(t, 0, counter.Closes())
require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
}
Expand Down Expand Up @@ -281,7 +286,11 @@ func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) {
domain.Nodes[1].Forwarders[selectedFwd].Cancel()

domain.Registry.Restart()
time.Sleep(time.Second)

require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 2, counter.Requests())
require.Equal(t, 0, counter.Closes())

// check different forwarder selected
request.Connection = conn
Expand Down
17 changes: 12 additions & 5 deletions pkg/networkservice/common/dial/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -64,7 +64,7 @@ func (d *dialClient) Request(ctx context.Context, request *networkservice.Networ
return next.Client(ctx).Request(ctx, request, opts...)
}

cc, _ := clientconn.LoadOrStore(ctx, newDialer(d.chainCtx, d.dialTimeout, d.dialOptions...))
cc, loaded := clientconn.LoadOrStore(ctx, newDialer(d.chainCtx, d.dialTimeout, d.dialOptions...))

// If there's an existing grpc.ClientConnInterface and it's not ours, call the next in the chain
di, ok := cc.(*dialer)
Expand All @@ -87,14 +87,21 @@ func (d *dialClient) Request(ctx context.Context, request *networkservice.Networ

err := di.Dial(ctx, clientURL)
if err != nil {
log.FromContext(ctx).Errorf("can not dial to %v, err %v. Deleting clientconn...", grpcutils.URLToTarget(clientURL), err)
clientconn.Delete(ctx)
log.FromContext(ctx).Errorf("Can not dial to %v", grpcutils.URLToTarget(clientURL))
if !loaded {
log.FromContext(ctx).Errorf("Deleting clientconn...")
_ = di.Close()
clientconn.Delete(ctx)
}
return nil, err
}

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
_ = di.Close()
if !loaded {
_ = di.Close()
clientconn.Delete(ctx)
}
return nil, err
}
return conn, nil
Expand Down
58 changes: 41 additions & 17 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,13 +21,13 @@ import (
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

Expand Down Expand Up @@ -58,33 +58,57 @@ func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkS
func (h *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
loopHandle, loaded := loadAndDelete(ctx)
if loaded {
loopHandle.cancel()
if started, ok := <-loopHandle.healingStartedCh; ok {
loopHandle.healingStarted = started
}
}

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
if loaded && !loopHandle.healingStarted {
eventLoopErr := h.startEventLoop(ctx, request.GetConnection())
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, request.GetConnection())
log.FromContext(ctx).Errorf("can't start monitoring after a failed refresh: %v", eventLoopErr)
}
}
return nil, err
}
cc, ccLoaded := clientconn.Load(ctx)
if ccLoaded {
cancelEventLoop, eventLoopErr := newEventLoop(
extend.WithValuesFromContext(h.chainCtx, ctx), cc, conn, h)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, errors.Wrap(eventLoopErr, "unable to monitor")
}
store(ctx, cancelEventLoop)
eventLoopErr := h.startEventLoop(ctx, conn)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, eventLoopErr
}
return conn, nil
}

func (h *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
if loopHandle, loaded := loadAndDelete(ctx); loaded {
loopHandle.cancel()
}
return next.Client(ctx).Close(ctx, conn)
}

func (h *healClient) startEventLoop(ctx context.Context, conn *networkservice.Connection) error {
cc, ccLoaded := clientconn.Load(ctx)
if !ccLoaded {
return nil
}
cancel, healingStartedCh, err := newEventLoop(extend.WithValuesFromContext(h.chainCtx, ctx), cc, conn, h)
if err != nil {
return err
}
store(ctx, eventLoopHandle{
cancel: cancel,
healingStartedCh: healingStartedCh,
})
return nil
}
Loading

0 comments on commit a8c394e

Please sign in to comment.