From fde0584cc74df8ed4dd85cff4c61178ee869ce15 Mon Sep 17 00:00:00 2001 From: Denis Tingaikin Date: Sat, 29 Apr 2023 00:34:10 +0300 Subject: [PATCH] simplify forwarder selection and add test for remote death case Signed-off-by: Denis Tingaikin --- .../chains/nsmgr/select_forwarder_test.go | 69 ++++++++++++ .../common/discoverforwarder/server.go | 104 +++++++----------- 2 files changed, 108 insertions(+), 65 deletions(-) diff --git a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go index 9de970e0f..c1395095f 100644 --- a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go +++ b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go @@ -223,3 +223,72 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { require.Equal(t, 0, counter.Closes()) require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) } + +func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + + defer cancel() + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(2). + SetNSMgrProxySupplier(nil). + SetRegistryProxySupplier(nil). + SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) { + node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer) + }). + Build() + + domain.Nodes[0].NewForwarder(ctx, ®istry.NetworkServiceEndpoint{ + Name: sandbox.UniqueName("forwarder-local"), + NetworkServiceNames: []string{"forwarder"}, + }, sandbox.GenerateTestToken) + + const fwdCount = 10 + for i := 0; i < fwdCount; i++ { + domain.Nodes[1].NewForwarder(ctx, ®istry.NetworkServiceEndpoint{ + Name: sandbox.UniqueName("forwarder-" + fmt.Sprint(i)), + NetworkServiceNames: []string{"forwarder"}, + }, sandbox.GenerateTestToken) + } + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg := defaultRegistryService(t.Name()) + nsReg, err := nsRegistryClient.Register(ctx, nsReg) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + counter := new(count.Server) + domain.Nodes[1].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + + request := defaultRequest(nsReg.Name) + + clientCounter := new(count.Client) + // make sure that Close from heal doesn't clear the forwarder name + // we want to clear it automatically in discoverforwarder element on Request + clientInject := injecterror.NewClient(injecterror.WithRequestErrorTimes(), injecterror.WithCloseErrorTimes(-1)) + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, + nsclient.WithAdditionalFunctionality(clientCounter, clientInject)) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 1, counter.Requests()) + + selectedFwd := conn.GetPath().GetPathSegments()[4].Name + + domain.Nodes[1].Forwarders[selectedFwd].Cancel() + + domain.Registry.Restart() + time.Sleep(time.Second) + + // check different forwarder selected + request.Connection = conn + conn, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 3, counter.Requests()) + require.Equal(t, 0, counter.Closes()) + require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[4].Name) +} diff --git a/pkg/networkservice/common/discoverforwarder/server.go b/pkg/networkservice/common/discoverforwarder/server.go index a8383cffc..c92f7d755 100644 --- a/pkg/networkservice/common/discoverforwarder/server.go +++ b/pkg/networkservice/common/discoverforwarder/server.go @@ -77,85 +77,59 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks var forwarderName = d.forwarderName(request.GetConnection()) var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request") - if forwarderName == "" || request.GetConnection().GetMechanism() == nil { - ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload()) - if err != nil { - return nil, err - } - - stream, err := d.nseClient.Find(ctx, ®istry.NetworkServiceEndpointQuery{ - NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - NetworkServiceNames: []string{ - d.forwarderServiceName, - }, - Url: d.nsmgrURL, - }, - }) - if err != nil { - logger.Errorf("can not open registry nse stream by networkservice. Error: %v", err.Error()) - return nil, errors.Wrapf(err, "failed to find %s on %s", d.forwarderServiceName, d.nsmgrURL) - } - - nses := d.matchForwarders(request.Connection.GetLabels(), ns, registry.ReadNetworkServiceEndpointList(stream)) - if len(nses) == 0 { - return nil, errors.New("no candidates found") - } - - segments := request.Connection.GetPath().GetPathSegments() - if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 { - for i, candidate := range nses { - if candidate.Name == forwarderName { - nses[0], nses[i] = nses[i], nses[0] - break - } - } - } - - var candidatesErr = errors.New("all forwarders have failed") - - // TODO: Should we consider about load balancing? - // https://github.com/networkservicemesh/sdk/issues/790 - for i, candidate := range nses { - u, err := url.Parse(candidate.Url) - if err != nil { - logger.Errorf("can not parse forwarder=%v url=%v error=%v", candidate.Name, candidate.Url, err.Error()) - continue - } - - resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone()) - if err == nil { - return resp, nil - } - logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error()) - candidatesErr = errors.Wrapf(candidatesErr, "%v. An error during select forwawrder %v --> %v", i, candidate.Name, err.Error()) - } - - return nil, candidatesErr + ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload()) + if err != nil { + return nil, err } stream, err := d.nseClient.Find(ctx, ®istry.NetworkServiceEndpointQuery{ NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - Name: forwarderName, - Url: d.nsmgrURL, + NetworkServiceNames: []string{ + d.forwarderServiceName, + }, + Url: d.nsmgrURL, }, }) if err != nil { - logger.Errorf("can not open registry nse stream by forwarder name. Error: %v", err.Error()) - return nil, errors.Wrapf(err, "failed to find %s on %s", forwarderName, d.nsmgrURL) + logger.Errorf("can not open registry nse stream by networkservice. Error: %v", err.Error()) + return nil, errors.Wrapf(err, "failed to find %s on %s", d.forwarderServiceName, d.nsmgrURL) } - nses := registry.ReadNetworkServiceEndpointList(stream) + nses := d.matchForwarders(request.Connection.GetLabels(), ns, registry.ReadNetworkServiceEndpointList(stream)) if len(nses) == 0 { - return nil, errors.New("forwarder not found") + return nil, errors.New("no candidates found") } - u, err := url.Parse(nses[0].Url) - if err != nil { - logger.Errorf("can not parse forwarder=%v url=%v error=%v", nses[0].Name, u, err.Error()) - return nil, errors.Wrapf(err, "failed to parse url %s", nses[0].Url) + segments := request.Connection.GetPath().GetPathSegments() + if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 { + for i, candidate := range nses { + if candidate.Name == forwarderName { + nses[0], nses[i] = nses[i], nses[0] + break + } + } + } + + var candidatesErr = errors.New("all forwarders have failed") + + // TODO: Should we consider about load balancing? + // https://github.com/networkservicemesh/sdk/issues/790 + for i, candidate := range nses { + u, err := url.Parse(candidate.Url) + if err != nil { + logger.Errorf("can not parse forwarder=%v url=%v error=%v", candidate.Name, candidate.Url, err.Error()) + continue + } + + resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone()) + if err == nil { + return resp, nil + } + logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error()) + candidatesErr = errors.Wrapf(candidatesErr, "%v. An error during select forwawrder %v --> %v", i, candidate.Name, err.Error()) } - return next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request) + return nil, candidatesErr } func (d *discoverForwarderServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {