Skip to content

Commit

Permalink
simplify forwarder selection and add test for remote death case (#1452)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin authored May 1, 2023
1 parent fbf0404 commit 6fa2f68
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 65 deletions.
69 changes: 69 additions & 0 deletions pkg/networkservice/chains/nsmgr/select_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,72 @@ func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) {
require.Equal(t, 0, counter.Closes())
require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
}

func Test_DiscoverForwarder_ChangeRemoteForwarderOnDeath(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
ctx, cancel := context.WithTimeout(context.Background(), timeout)

defer cancel()
domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(2).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
Build()

domain.Nodes[0].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-local"),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken)

const fwdCount = 10
for i := 0; i < fwdCount; i++ {
domain.Nodes[1].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-" + fmt.Sprint(i)),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken)
}

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService(t.Name())
nsReg, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := new(count.Server)
domain.Nodes[1].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)

clientCounter := new(count.Client)
// make sure that Close from heal doesn't clear the forwarder name
// we want to clear it automatically in discoverforwarder element on Request
clientInject := injecterror.NewClient(injecterror.WithRequestErrorTimes(), injecterror.WithCloseErrorTimes(-1))
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken,
nsclient.WithAdditionalFunctionality(clientCounter, clientInject))

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 1, counter.Requests())

selectedFwd := conn.GetPath().GetPathSegments()[4].Name

domain.Nodes[1].Forwarders[selectedFwd].Cancel()

domain.Registry.Restart()
time.Sleep(time.Second)

// check different forwarder selected
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 3, counter.Requests())
require.Equal(t, 0, counter.Closes())
require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[4].Name)
}
104 changes: 39 additions & 65 deletions pkg/networkservice/common/discoverforwarder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,85 +77,59 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks
var forwarderName = d.forwarderName(request.GetConnection())
var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request")

if forwarderName == "" || request.GetConnection().GetMechanism() == nil {
ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload())
if err != nil {
return nil, err
}

stream, err := d.nseClient.Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
NetworkServiceNames: []string{
d.forwarderServiceName,
},
Url: d.nsmgrURL,
},
})
if err != nil {
logger.Errorf("can not open registry nse stream by networkservice. Error: %v", err.Error())
return nil, errors.Wrapf(err, "failed to find %s on %s", d.forwarderServiceName, d.nsmgrURL)
}

nses := d.matchForwarders(request.Connection.GetLabels(), ns, registry.ReadNetworkServiceEndpointList(stream))
if len(nses) == 0 {
return nil, errors.New("no candidates found")
}

segments := request.Connection.GetPath().GetPathSegments()
if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 {
for i, candidate := range nses {
if candidate.Name == forwarderName {
nses[0], nses[i] = nses[i], nses[0]
break
}
}
}

var candidatesErr = errors.New("all forwarders have failed")

// TODO: Should we consider about load balancing?
// https://github.com/networkservicemesh/sdk/issues/790
for i, candidate := range nses {
u, err := url.Parse(candidate.Url)
if err != nil {
logger.Errorf("can not parse forwarder=%v url=%v error=%v", candidate.Name, candidate.Url, err.Error())
continue
}

resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone())
if err == nil {
return resp, nil
}
logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error())
candidatesErr = errors.Wrapf(candidatesErr, "%v. An error during select forwawrder %v --> %v", i, candidate.Name, err.Error())
}

return nil, candidatesErr
ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload())
if err != nil {
return nil, err
}

stream, err := d.nseClient.Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: forwarderName,
Url: d.nsmgrURL,
NetworkServiceNames: []string{
d.forwarderServiceName,
},
Url: d.nsmgrURL,
},
})
if err != nil {
logger.Errorf("can not open registry nse stream by forwarder name. Error: %v", err.Error())
return nil, errors.Wrapf(err, "failed to find %s on %s", forwarderName, d.nsmgrURL)
logger.Errorf("can not open registry nse stream by networkservice. Error: %v", err.Error())
return nil, errors.Wrapf(err, "failed to find %s on %s", d.forwarderServiceName, d.nsmgrURL)
}

nses := registry.ReadNetworkServiceEndpointList(stream)
nses := d.matchForwarders(request.Connection.GetLabels(), ns, registry.ReadNetworkServiceEndpointList(stream))
if len(nses) == 0 {
return nil, errors.New("forwarder not found")
return nil, errors.New("no candidates found")
}

u, err := url.Parse(nses[0].Url)
if err != nil {
logger.Errorf("can not parse forwarder=%v url=%v error=%v", nses[0].Name, u, err.Error())
return nil, errors.Wrapf(err, "failed to parse url %s", nses[0].Url)
segments := request.Connection.GetPath().GetPathSegments()
if pathIndex := int(request.Connection.GetPath().Index); len(segments) > pathIndex+1 {
for i, candidate := range nses {
if candidate.Name == forwarderName {
nses[0], nses[i] = nses[i], nses[0]
break
}
}
}

var candidatesErr = errors.New("all forwarders have failed")

// TODO: Should we consider about load balancing?
// https://github.com/networkservicemesh/sdk/issues/790
for i, candidate := range nses {
u, err := url.Parse(candidate.Url)
if err != nil {
logger.Errorf("can not parse forwarder=%v url=%v error=%v", candidate.Name, candidate.Url, err.Error())
continue
}

resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone())
if err == nil {
return resp, nil
}
logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error())
candidatesErr = errors.Wrapf(candidatesErr, "%v. An error during select forwawrder %v --> %v", i, candidate.Name, err.Error())
}

return next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request)
return nil, candidatesErr
}

func (d *discoverForwarderServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
Expand Down

0 comments on commit 6fa2f68

Please sign in to comment.