From a8c394e69c4e1a10acc3a61acb5eb88d57901a67 Mon Sep 17 00:00:00 2001 From: Danil Uzlov <36223296+d-uzlov@users.noreply.github.com> Date: Tue, 20 Jun 2023 21:27:40 +0700 Subject: [PATCH] Fix healing after failed refresh (#1465) * fix dialer Signed-off-by: Danil Uzlov * add tests for healing after refresh Signed-off-by: Danil Uzlov * refactor heal context for event loop Signed-off-by: Danil Uzlov * add heal started flag to heal event lopp Signed-off-by: Danil Uzlov * start monitoring after refresh error Signed-off-by: Danil Uzlov * remove test TestNSMGR_RefreshFailed_DataPlaneHealthy Signed-off-by: Danil Uzlov * fix coyright Signed-off-by: Danil Uzlov * fix linter Signed-off-by: Danil Uzlov * fix linter Signed-off-by: Danil Uzlov * bump ci Signed-off-by: Danil Uzlov * fix discover forwarder tests Signed-off-by: Danil Uzlov * improve heal monitor cleanup Signed-off-by: Danil Uzlov * remove withrefresh option, use clock in context Signed-off-by: Danil Uzlov * move monitorCtrlPlane function Signed-off-by: Danil Uzlov * use explicit returns in waitForEvents Signed-off-by: Danil Uzlov * fix typo Signed-off-by: Danil Uzlov * different fix for select forwarder tests Signed-off-by: Danil Uzlov * use better variable names in heal monitor Signed-off-by: Danil Uzlov --------- Signed-off-by: Danil Uzlov --- pkg/networkservice/chains/nsmgr/heal_test.go | 83 +++++++++++++++++++ .../chains/nsmgr/select_forwarder_test.go | 13 ++- pkg/networkservice/common/dial/client.go | 17 ++-- pkg/networkservice/common/heal/client.go | 58 +++++++++---- pkg/networkservice/common/heal/eventloop.go | 80 ++++++++++-------- pkg/networkservice/common/heal/metadata.go | 18 ++-- pkg/tools/spire/server.conf.go | 2 +- 7 files changed, 206 insertions(+), 65 deletions(-) diff --git a/pkg/networkservice/chains/nsmgr/heal_test.go b/pkg/networkservice/chains/nsmgr/heal_test.go index 89bbdf4b0..80f4cada3 100644 --- a/pkg/networkservice/chains/nsmgr/heal_test.go +++ b/pkg/networkservice/chains/nsmgr/heal_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2022 Doc.ai and/or its affiliates. // +// Copyright (c) 2023 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,6 +26,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/goleak" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -34,8 +37,12 @@ import ( nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client" + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/clockmock" "github.com/networkservicemesh/sdk/pkg/tools/sandbox" ) @@ -731,3 +738,79 @@ func testForwarderShouldBeSelectedCorrectlyOnNSMgrRestart(t *testing.T, nodeNum, }, sandbox.GenerateTestToken) } } + +func TestNSMGR_RefreshFailed_DataPlaneBroken(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + Build() + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg := defaultRegistryService(t.Name()) + nsReg, err := nsRegistryClient.Register(ctx, nsReg) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + counter1 := new(count.Server) + // allow only one successful request + inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(1, -1)) + isDataplaneHealthy := atomic.Bool{} + isDataplaneHealthy.Store(true) + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter1, inject) + + request := defaultRequest(nsReg.Name) + + tokenDuration := time.Minute * 15 + clk := clockmock.New(ctx) + clk.Set(time.Now()) + + nsc := domain.Nodes[0].NewClient(ctx, + sandbox.GenerateExpiringToken(tokenDuration), + nsclient.WithHealClient(heal.NewClient(ctx, + heal.WithLivenessCheck(func(ctx context.Context, conn *networkservice.Connection) bool { + return isDataplaneHealthy.Load() + }), + heal.WithLivenessCheckInterval(time.Millisecond*10), + )), + ) + + requestCtx, requestCalcel := context.WithTimeout(ctx, time.Second) + requestCtx = clock.WithClock(requestCtx, clk) + defer requestCalcel() + conn, err := nsc.Request(requestCtx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter1.Requests()) + + nseReg2 := defaultRegistryEndpoint(nsReg.Name) + nseReg2.Name += "-2" + + counter2 := new(count.Server) + // automatically restore data plane flag to prevent repeated heal + dataPlaneNotifier := checkresponse.NewServer(t, func(t *testing.T, c *networkservice.Connection) { + if c != nil { + isDataplaneHealthy.Store(true) + } + }) + domain.Nodes[0].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, dataPlaneNotifier, counter2) + + // refresh interval in this test is expected to be 3 minutes and a few milliseconds + clk.Add(time.Second * 190) + + // wait till refresh reached NSE, to make sure that initial heal monitor is canceled + require.Eventually(t, checkSecondRequestsReceived(counter1.Requests), timeout, tick) + + isDataplaneHealthy.Store(false) + + require.Eventually(t, func() bool { return counter2.Requests() >= 1 }, timeout, tick) + require.Equal(t, 1, counter2.UniqueRequests()) + require.Equal(t, 1, counter2.Requests()) + + _, err = nsc.Close(ctx, conn.Clone()) + require.NoError(t, err) +} diff --git a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go index c1395095f..d3d8c0d27 100644 --- a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go +++ b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go @@ -214,12 +214,17 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { domain.Nodes[0].Forwarders[selectedFwd].Cancel() + require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 2, counter.Requests()) + require.Equal(t, 0, counter.Closes()) + // check different forwarder selected request.Connection = conn conn, err = nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.Equal(t, 1, counter.UniqueRequests()) - require.Equal(t, 2, counter.Requests()) + require.Equal(t, 3, counter.Requests()) require.Equal(t, 0, counter.Closes()) require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) } @@ -281,7 +286,11 @@ func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) { domain.Nodes[1].Forwarders[selectedFwd].Cancel() domain.Registry.Restart() - time.Sleep(time.Second) + + require.Eventually(t, checkSecondRequestsReceived(counter.Requests), timeout, tick) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 2, counter.Requests()) + require.Equal(t, 0, counter.Closes()) // check different forwarder selected request.Connection = conn diff --git a/pkg/networkservice/common/dial/client.go b/pkg/networkservice/common/dial/client.go index dc8d746c1..bf5705457 100644 --- a/pkg/networkservice/common/dial/client.go +++ b/pkg/networkservice/common/dial/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Cisco and/or its affiliates. +// Copyright (c) 2021-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -64,7 +64,7 @@ func (d *dialClient) Request(ctx context.Context, request *networkservice.Networ return next.Client(ctx).Request(ctx, request, opts...) } - cc, _ := clientconn.LoadOrStore(ctx, newDialer(d.chainCtx, d.dialTimeout, d.dialOptions...)) + cc, loaded := clientconn.LoadOrStore(ctx, newDialer(d.chainCtx, d.dialTimeout, d.dialOptions...)) // If there's an existing grpc.ClientConnInterface and it's not ours, call the next in the chain di, ok := cc.(*dialer) @@ -87,14 +87,21 @@ func (d *dialClient) Request(ctx context.Context, request *networkservice.Networ err := di.Dial(ctx, clientURL) if err != nil { - log.FromContext(ctx).Errorf("can not dial to %v, err %v. Deleting clientconn...", grpcutils.URLToTarget(clientURL), err) - clientconn.Delete(ctx) + log.FromContext(ctx).Errorf("Can not dial to %v", grpcutils.URLToTarget(clientURL)) + if !loaded { + log.FromContext(ctx).Errorf("Deleting clientconn...") + _ = di.Close() + clientconn.Delete(ctx) + } return nil, err } conn, err := next.Client(ctx).Request(ctx, request, opts...) if err != nil { - _ = di.Close() + if !loaded { + _ = di.Close() + clientconn.Delete(ctx) + } return nil, err } return conn, nil diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index f763d0c45..c0bce8f3e 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 Cisco and/or its affiliates. +// Copyright (c) 2021-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,13 +21,13 @@ import ( "time" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/extend" + "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/postpone" ) @@ -58,33 +58,57 @@ func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkS func (h *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { closeCtxFunc := postpone.ContextWithValues(ctx) // Cancel any existing eventLoop - if cancelEventLoop, loaded := loadAndDelete(ctx); loaded { - cancelEventLoop() + loopHandle, loaded := loadAndDelete(ctx) + if loaded { + loopHandle.cancel() + if started, ok := <-loopHandle.healingStartedCh; ok { + loopHandle.healingStarted = started + } } conn, err := next.Client(ctx).Request(ctx, request, opts...) if err != nil { + if loaded && !loopHandle.healingStarted { + eventLoopErr := h.startEventLoop(ctx, request.GetConnection()) + if eventLoopErr != nil { + closeCtx, closeCancel := closeCtxFunc() + defer closeCancel() + _, _ = next.Client(closeCtx).Close(closeCtx, request.GetConnection()) + log.FromContext(ctx).Errorf("can't start monitoring after a failed refresh: %v", eventLoopErr) + } + } return nil, err } - cc, ccLoaded := clientconn.Load(ctx) - if ccLoaded { - cancelEventLoop, eventLoopErr := newEventLoop( - extend.WithValuesFromContext(h.chainCtx, ctx), cc, conn, h) - if eventLoopErr != nil { - closeCtx, closeCancel := closeCtxFunc() - defer closeCancel() - _, _ = next.Client(closeCtx).Close(closeCtx, conn) - return nil, errors.Wrap(eventLoopErr, "unable to monitor") - } - store(ctx, cancelEventLoop) + eventLoopErr := h.startEventLoop(ctx, conn) + if eventLoopErr != nil { + closeCtx, closeCancel := closeCtxFunc() + defer closeCancel() + _, _ = next.Client(closeCtx).Close(closeCtx, conn) + return nil, eventLoopErr } return conn, nil } func (h *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { // Cancel any existing eventLoop - if cancelEventLoop, loaded := loadAndDelete(ctx); loaded { - cancelEventLoop() + if loopHandle, loaded := loadAndDelete(ctx); loaded { + loopHandle.cancel() } return next.Client(ctx).Close(ctx, conn) } + +func (h *healClient) startEventLoop(ctx context.Context, conn *networkservice.Connection) error { + cc, ccLoaded := clientconn.Load(ctx) + if !ccLoaded { + return nil + } + cancel, healingStartedCh, err := newEventLoop(extend.WithValuesFromContext(h.chainCtx, ctx), cc, conn, h) + if err != nil { + return err + } + store(ctx, eventLoopHandle{ + cancel: cancel, + healingStartedCh: healingStartedCh, + }) + return nil +} diff --git a/pkg/networkservice/common/heal/eventloop.go b/pkg/networkservice/common/heal/eventloop.go index 3287c9774..ba5b6c9c0 100644 --- a/pkg/networkservice/common/heal/eventloop.go +++ b/pkg/networkservice/common/heal/eventloop.go @@ -32,22 +32,24 @@ import ( ) type eventLoop struct { - heal *healClient - eventLoopCtx context.Context - chainCtx context.Context - conn *networkservice.Connection - eventFactory begin.EventFactory - client networkservice.MonitorConnection_MonitorConnectionsClient - logger log.Logger + heal *healClient + eventLoopCtx context.Context + eventLoopCancel context.CancelFunc + chainCtx context.Context + conn *networkservice.Connection + eventFactory begin.EventFactory + client networkservice.MonitorConnection_MonitorConnectionsClient + logger log.Logger + healingStartedCh chan bool } -func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networkservice.Connection, heal *healClient) (context.CancelFunc, error) { +func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networkservice.Connection, heal *healClient) (context.CancelFunc, <-chan bool, error) { conn = conn.Clone() ev := begin.FromContext(ctx) // Is another chain element asking for events? If not, no need to monitor if ev == nil { - return func() {}, nil + return func() {}, nil, nil } // Create new eventLoopCtx and store its eventLoopCancel @@ -66,30 +68,32 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector) if err != nil { eventLoopCancel() - return nil, errors.Wrap(err, "failed get MonitorConnections client") + return nil, nil, errors.Wrap(err, "failed get MonitorConnections client") } // get the initial state transfer and use it to detect whether we have a real connection or not _, err = client.Recv() if err != nil { eventLoopCancel() - return nil, errors.Wrap(err, "failed to get the initial state transfer") + return nil, nil, errors.Wrap(err, "failed to get the initial state transfer") } logger := log.FromContext(ctx).WithField("heal", "eventLoop") cev := &eventLoop{ - heal: heal, - eventLoopCtx: eventLoopCtx, - chainCtx: ctx, - conn: conn, - eventFactory: ev, - client: newClientFilter(client, conn, logger), - logger: logger, + heal: heal, + eventLoopCtx: eventLoopCtx, + eventLoopCancel: eventLoopCancel, + chainCtx: ctx, + conn: conn, + eventFactory: ev, + client: newClientFilter(client, conn, logger), + logger: logger, + healingStartedCh: make(chan bool, 1), } // Start the eventLoop go cev.eventLoop() - return eventLoopCancel, nil + return eventLoopCancel, cev.healingStartedCh, nil } func (cev *eventLoop) monitorCtrlPlane() <-chan struct{} { @@ -127,46 +131,51 @@ func (cev *eventLoop) monitorCtrlPlane() <-chan struct{} { return res } -func (cev *eventLoop) eventLoop() { - reselect := false +func (cev *eventLoop) waitForEvents() (canceled, reselect bool) { + defer close(cev.healingStartedCh) + // make sure we stop all monitors if chain context was canceled + defer cev.eventLoopCancel() ctrlPlaneCh := cev.monitorCtrlPlane() dataPlaneCh := cev.monitorDataPlane() - if dataPlaneCh == nil { - // Since we don't know about data path status - always use reselect - reselect = true - } - select { case _, ok := <-ctrlPlaneCh: if ok { // Connection closed - return + return true, false } - // Start healing cev.logger.Warnf("Control plane is down") + cev.healingStartedCh <- true + // use reselect if data plane monitoring isn't available + return false, dataPlaneCh == nil case _, ok := <-dataPlaneCh: if ok { // Connection closed - return + return true, false } - // Start healing cev.logger.Warnf("Data plane is down") reselect = true + cev.healingStartedCh <- true + return false, true case <-cev.chainCtx.Done(): - return case <-cev.eventLoopCtx.Done(): + } + return true, false +} + +func (cev *eventLoop) eventLoop() { + canceled, reselect := cev.waitForEvents() + + if canceled { return } - /* Attempts to heal the connection */ for { select { case <-cev.chainCtx.Done(): return default: - var options []begin.Option if cev.chainCtx.Err() != nil { return } @@ -181,11 +190,14 @@ func (cev *eventLoop) eventLoop() { deadlineCancel() } + var options []begin.Option if reselect { cev.logger.Debugf("Reconnect with reselect") options = append(options, begin.WithReselect()) } - if err := <-cev.eventFactory.Request(options...); err == nil { + err := <-cev.eventFactory.Request(options...) + if err == nil { + cev.logger.Info("Heal success") return } } diff --git a/pkg/networkservice/common/heal/metadata.go b/pkg/networkservice/common/heal/metadata.go index 3abb69914..ca1cd3aaa 100644 --- a/pkg/networkservice/common/heal/metadata.go +++ b/pkg/networkservice/common/heal/metadata.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Cisco and/or its affiliates. +// Copyright (c) 2021-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,18 +24,24 @@ import ( type key struct{} -// store sets the context.CancelFunc stored in per Connection.Id metadata. -func store(ctx context.Context, cancel context.CancelFunc) { +type eventLoopHandle struct { + cancel context.CancelFunc + healingStartedCh <-chan bool + healingStarted bool +} + +// store sets the eventLoopHandle stored in per Connection.Id metadata. +func store(ctx context.Context, cancel eventLoopHandle) { metadata.Map(ctx, true).Store(key{}, cancel) } -// loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// loadAndDelete deletes the eventLoopHandle stored in per Connection.Id metadata, // returning the previous value if any. The loaded result reports whether the key was present. -func loadAndDelete(ctx context.Context) (value context.CancelFunc, ok bool) { +func loadAndDelete(ctx context.Context) (value eventLoopHandle, loaded bool) { rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(key{}) if !ok { return } - value, ok = rawValue.(context.CancelFunc) + value, ok = rawValue.(eventLoopHandle) return value, ok } diff --git a/pkg/tools/spire/server.conf.go b/pkg/tools/spire/server.conf.go index 4e6793885..7d355b0b6 100644 --- a/pkg/tools/spire/server.conf.go +++ b/pkg/tools/spire/server.conf.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 Cisco and/or its affiliates. +// Copyright (c) 2020-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 //