Skip to content

Commit

Permalink
[sdk#1111] Fix sendfd issue on URL change (#1112)
Browse files Browse the repository at this point in the history
* Set dial client before additional functionality in client chain

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>

* Fix recvfd/senfd clients

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>

* Clone request in remote iteration points

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>

* Add multiforwarder sendfd test

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>

* Revert "Fix recvfd/senfd clients"

This reverts commit d14a71d.

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>

* Fix naming

Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov authored Oct 20, 2021
1 parent f264fec commit a94971d
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 61 deletions.
6 changes: 3 additions & 3 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ func NewClient(ctx context.Context, clientOpts ...Option) networkservice.Network
opts.refreshClient,
clienturl.NewClient(opts.clientURL),
clientconn.NewClient(opts.cc),
},
append(
opts.additionalFunctionality,
dial.NewClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(opts.dialTimeout),
),
},
append(
opts.additionalFunctionality,
opts.authorizeClient,
connect.NewClient(),
)...,
Expand Down
55 changes: 0 additions & 55 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"

Expand All @@ -32,7 +31,6 @@ import (

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/clientinfo"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)
Expand Down Expand Up @@ -191,59 +189,6 @@ func Test_ShouldCorrectlyAddEndpointsWithSameNames(t *testing.T) {
}
}

func Test_Local_NoURLUsecase(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Unix sockets are not supported under windows, skipping")
return
}
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
UseUnixSockets().
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
SetRegistrySupplier(nil).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
request := defaultRequest(nsReg.Name)
counter := new(count.Server)

domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, 1, counter.Requests())
require.Equal(t, 5, len(conn.Path.PathSegments))

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

conn2, err := nsc.Request(ctx, refreshRequest)
require.NoError(t, err)
require.NotNil(t, conn2)
require.Equal(t, 5, len(conn2.Path.PathSegments))
require.Equal(t, 2, counter.Requests())

// Close
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
require.Equal(t, 1, counter.Closes())
}

func Test_ShouldParseNetworkServiceLabelsTemplate(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand Down
146 changes: 146 additions & 0 deletions pkg/networkservice/chains/nsmgr/unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//+build !windows

package nsmgr_test

import (
"context"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

func Test_Local_NoURLUsecase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
UseUnixSockets().
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
request := defaultRequest(nsReg.Name)
counter := new(count.Server)

domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, 1, counter.Requests())
require.Equal(t, 5, len(conn.Path.PathSegments))

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

conn2, err := nsc.Request(ctx, refreshRequest)
require.NoError(t, err)
require.NotNil(t, conn2)
require.Equal(t, 5, len(conn2.Path.PathSegments))
require.Equal(t, 2, counter.Requests())

// Close
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
require.Equal(t, 1, counter.Closes())
}

func Test_MultiForwarderSendfd(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("sendfd works only on linux")
}
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

errorServer := injecterror.NewServer(
injecterror.WithRequestErrorTimes(0),
injecterror.WithCloseErrorTimes(),
)
domain := sandbox.NewBuilder(ctx, t).
UseUnixSockets().
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: "forwarder-1",
}, sandbox.GenerateTestToken, errorServer, recvfd.NewServer())
node.NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: "forwarder-2",
}, sandbox.GenerateTestToken, errorServer, recvfd.NewServer())
}).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
counter := new(count.Server)

domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, kernel.NewClient(), sendfd.NewClient())

request := defaultRequest(nsReg.Name)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, 1, counter.Requests())
require.Equal(t, 5, len(conn.Path.PathSegments))

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

conn2, err := nsc.Request(ctx, refreshRequest)
require.NoError(t, err)
require.NotNil(t, conn2)
require.Equal(t, 5, len(conn2.Path.PathSegments))
require.Equal(t, 2, counter.Requests())

// Close
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
require.Equal(t, 1, counter.Closes())
}
2 changes: 1 addition & 1 deletion pkg/networkservice/common/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (d *discoverCandidatesServer) Request(ctx context.Context, request *network

delay := defaultDiscoverDelay
for ctx.Err() == nil {
resp, err := next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request)
resp, err := next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request.Clone())
if err == nil {
return resp, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/interpose/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (l *interposeServer) Request(ctx context.Context, request *networkservice.N
endpointURL: clientURL,
interposeNSEURL: crossNSEURL,
})
result, err = next.Server(crossCTX).Request(crossCTX, request)
result, err = next.Server(crossCTX).Request(crossCTX, request.Clone())
if err != nil {
logger.Errorf("failed to request cross NSE %v err: %v", crossNSEURL, err)
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/roundrobin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *selectEndpointServer) Request(ctx context.Context, request *networkserv
}
ctx = clienturlctx.WithClientURL(ctx, u)
request.GetConnection().NetworkServiceEndpointName = endpoint.Name
resp, err := next.Server(ctx).Request(ctx, request)
resp, err := next.Server(ctx).Request(ctx, request.Clone())
if err == nil {
return resp, err
}
Expand Down
88 changes: 88 additions & 0 deletions pkg/networkservice/common/roundrobin/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package roundrobin_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/discover"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/roundrobin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/switchcase"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
)

const (
nse1 = "nse-1"
nse2 = "nse-2"
ns = "ns"
)

func TestSelectEndpointServer_CleanRequest(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var hasBeenRequested bool
s := next.NewNetworkServiceServer(
roundrobin.NewServer(),
switchcase.NewServer(
&switchcase.ServerCase{
Condition: func(_ context.Context, conn *networkservice.Connection) bool {
return conn.NetworkServiceEndpointName == nse1
},
Server: next.NewNetworkServiceServer(
checkrequest.NewServer(t, func(_ *testing.T, request *networkservice.NetworkServiceRequest) {
hasBeenRequested = true
request.Connection.Labels[nse1] = nse1
}),
injecterror.NewServer(),
),
},
),
)

ctx := discover.WithCandidates(context.Background(), []*registry.NetworkServiceEndpoint{
{
Name: nse1,
Url: "unix://" + nse1,
NetworkServiceNames: []string{ns},
},
{
Name: nse2,
Url: "unix://" + nse2,
NetworkServiceNames: []string{ns},
},
}, &registry.NetworkService{Name: ns})

conn, err := s.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Labels: map[string]string{nse2: nse2},
},
})
require.NoError(t, err)

require.True(t, hasBeenRequested)
require.Equal(t, nse2, conn.NetworkServiceEndpointName)
require.Equal(t, map[string]string{nse2: nse2}, conn.Labels)
}

0 comments on commit a94971d

Please sign in to comment.