diff --git a/pkg/registry/common/begin/benchmark_test.go b/pkg/registry/common/begin/benchmark_test.go new file mode 100644 index 000000000..512b36d75 --- /dev/null +++ b/pkg/registry/common/begin/benchmark_test.go @@ -0,0 +1,186 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/registry/common/begin" + "github.com/networkservicemesh/sdk/pkg/registry/core/chain" + "github.com/networkservicemesh/sdk/pkg/registry/core/next" +) + +var ( + count = 1000 +) + +type dataRaceServer struct { + count int +} + +func (s *dataRaceServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) { + s.count++ + return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in) +} + +func (s *dataRaceServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server) +} + +func (s *dataRaceServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) { + s.count-- + return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) +} + +func BenchmarkBegin_RegisterSameIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + &dataRaceServer{count: 0}, + ) + + var wg sync.WaitGroup + wg.Add(count) + b.ResetTimer() + for i := 0; i < count; i++ { + go func() { + _, _ = server.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: "1"}) + wg.Done() + }() + } + wg.Wait() +} + +func BenchmarkBegin_UnregisterSameIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + &dataRaceServer{count: 0}, + ) + + var wg sync.WaitGroup + wg.Add(count) + b.ResetTimer() + for i := 0; i < count; i++ { + go func() { + _, _ = server.Unregister(context.Background(), ®istry.NetworkServiceEndpoint{Name: "1"}) + wg.Done() + }() + } + wg.Wait() +} + +func BenchmarkBegin_RegisterUnregisterSameIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + &dataRaceServer{count: 0}, + ) + + var wg sync.WaitGroup + wg.Add(2 * count) + b.ResetTimer() + go func() { + for i := 0; i < count; i++ { + go func() { + _, _ = server.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: "1"}) + wg.Done() + }() + } + }() + + go func() { + for i := 0; i < count; i++ { + go func() { + _, _ = server.Unregister(context.Background(), ®istry.NetworkServiceEndpoint{Name: "1"}) + wg.Done() + }() + } + }() + + wg.Wait() +} + +func BenchmarkBegin_RegisterDifferentIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + ) + + var wg sync.WaitGroup + wg.Add(count) + b.ResetTimer() + for i := 0; i < count; i++ { + local := i + go func() { + _, _ = server.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: fmt.Sprint(local)}) + wg.Done() + }() + } + wg.Wait() +} + +func BenchmarkBegin_UnregisterDifferentIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + ) + + var wg sync.WaitGroup + wg.Add(count) + b.ResetTimer() + for i := 0; i < count; i++ { + local := i + go func() { + _, _ = server.Unregister(context.Background(), ®istry.NetworkServiceEndpoint{Name: fmt.Sprint(local)}) + wg.Done() + }() + } + wg.Wait() +} + +func BenchmarkBegin_RegisterUnregisterDifferentIDs(b *testing.B) { + server := chain.NewNetworkServiceEndpointRegistryServer( + begin.NewNetworkServiceEndpointRegistryServer(), + ) + + var wg sync.WaitGroup + wg.Add(2 * count) + b.ResetTimer() + go func() { + for i := 0; i < count; i++ { + local := i + go func() { + _, _ = server.Register(context.Background(), ®istry.NetworkServiceEndpoint{Name: fmt.Sprint(local)}) + wg.Done() + }() + } + }() + + go func() { + for i := 0; i < count; i++ { + local := i + go func() { + _, _ = server.Unregister(context.Background(), ®istry.NetworkServiceEndpoint{Name: fmt.Sprint(local)}) + wg.Done() + }() + } + }() + + wg.Wait() +} diff --git a/pkg/registry/common/begin/close_client_test.go b/pkg/registry/common/begin/close_client_test.go index 7d798bccb..80faa27da 100644 --- a/pkg/registry/common/begin/close_client_test.go +++ b/pkg/registry/common/begin/close_client_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Cisco and/or its affiliates. +// Copyright (c) 2022-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,7 +18,6 @@ package begin_test import ( "context" - "sync" "testing" "github.com/golang/protobuf/ptypes/empty" @@ -85,46 +84,3 @@ func (m *markClient) Unregister(ctx context.Context, in *registry.NetworkService assert.Equal(m.t, mark, in.GetNetworkServiceLabels()[mark].Labels[mark]) return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) } - -func TestDoubleCloseClient(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - client := chain.NewNetworkServiceEndpointRegistryClient( - begin.NewNetworkServiceEndpointRegistryClient(), - &doubleCloseClient{t: t}, - ) - id := "1" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - conn, err := client.Register(ctx, ®istry.NetworkServiceEndpoint{ - Name: id, - }) - assert.NotNil(t, t, conn) - assert.NoError(t, err) - conn = conn.Clone() - _, err = client.Unregister(ctx, conn) - assert.NoError(t, err) - _, err = client.Unregister(ctx, conn) - assert.NoError(t, err) -} - -type doubleCloseClient struct { - t *testing.T - sync.Once -} - -func (s *doubleCloseClient) Register(ctx context.Context, in *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*registry.NetworkServiceEndpoint, error) { - return next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, in, opts...) -} - -func (s *doubleCloseClient) Find(ctx context.Context, in *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) { - return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, in, opts...) -} - -func (s *doubleCloseClient) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint, opts ...grpc.CallOption) (*empty.Empty, error) { - count := 1 - s.Do(func() { - count++ - }) - assert.Equal(s.t, 2, count, "Close has been called more than once") - return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) -} diff --git a/pkg/registry/common/begin/close_server_test.go b/pkg/registry/common/begin/close_server_test.go index 716fb11a5..54eae1ebe 100644 --- a/pkg/registry/common/begin/close_server_test.go +++ b/pkg/registry/common/begin/close_server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Cisco and/or its affiliates. +// Copyright (c) 2022-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,20 +18,16 @@ package begin_test import ( "context" - "sync" "testing" "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/registry/common/begin" - "github.com/networkservicemesh/sdk/pkg/registry/common/null" "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" "github.com/networkservicemesh/sdk/pkg/registry/core/chain" - "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/stretchr/testify/assert" "go.uber.org/goleak" - "google.golang.org/protobuf/types/known/emptypb" ) func TestCloseServer(t *testing.T) { @@ -55,39 +51,3 @@ func TestCloseServer(t *testing.T) { _, err = server.Unregister(ctx, conn) assert.NoError(t, err) } - -func TestDoubleCloseServer(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - server := chain.NewNetworkServiceEndpointRegistryServer( - begin.NewNetworkServiceEndpointRegistryServer(), - &doubleCloseServer{t: t, NetworkServiceEndpointRegistryServer: null.NewNetworkServiceEndpointRegistryServer()}, - ) - id := "1" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - conn, err := server.Register(ctx, ®istry.NetworkServiceEndpoint{ - Name: id, - }) - assert.NotNil(t, t, conn) - assert.NoError(t, err) - conn = conn.Clone() - _, err = server.Unregister(ctx, conn) - assert.NoError(t, err) - _, err = server.Unregister(ctx, conn) - assert.NoError(t, err) -} - -type doubleCloseServer struct { - t *testing.T - sync.Once - registry.NetworkServiceEndpointRegistryServer -} - -func (s *doubleCloseServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*emptypb.Empty, error) { - count := 1 - s.Do(func() { - count++ - }) - assert.Equal(s.t, 2, count, "Close has been called more than once") - return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) -} diff --git a/pkg/registry/common/begin/fifo_test.go b/pkg/registry/common/begin/fifo_test.go new file mode 100644 index 000000000..632d54ed7 --- /dev/null +++ b/pkg/registry/common/begin/fifo_test.go @@ -0,0 +1,195 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "net" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/networkservicemesh/sdk/pkg/registry/common/begin" + "github.com/networkservicemesh/sdk/pkg/registry/core/next" +) + +func TestFIFOSequence(t *testing.T) { + t.Skip() + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var serverWg sync.WaitGroup + var clientWg sync.WaitGroup + collector := &collectorServer{} + server := next.NewNetworkServiceEndpointRegistryServer( + &waitGroupServer{wg: &serverWg}, + begin.NewNetworkServiceEndpointRegistryServer(), + collector, + &delayServer{}, + ) + + serverLis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + grpcServer := grpc.NewServer() + registry.RegisterNetworkServiceEndpointRegistryServer(grpcServer, server) + defer grpcServer.Stop() + + go func() { + serveErr := grpcServer.Serve(serverLis) + require.NoError(t, serveErr) + }() + + clientConn, err := grpc.Dial(serverLis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + client := registry.NewNetworkServiceEndpointRegistryClient(clientConn) + defer func() { + closeErr := clientConn.Close() + require.NoError(t, closeErr) + }() + + count := 10 + nses := []*registry.NetworkServiceEndpoint{} + for i := 0; i < count; i++ { + nses = append(nses, ®istry.NetworkServiceEndpoint{Name: "nse", Url: fmt.Sprint(i)}) + } + + expected := make([]request, 0) + + clientWg.Add(count) + for i := 0; i < count; i++ { + local := i + serverWg.Add(1) + go func() { + var err error + if local%2 == 0 { + expected = append(expected, request{requestType: register, requestData: nses[local]}) + _, err = client.Register(ctx, nses[local]) + } else { + expected = append(expected, request{requestType: unregister, requestData: nses[local]}) + _, err = client.Unregister(ctx, nses[local]) + } + require.NoError(t, err) + clientWg.Done() + }() + serverWg.Wait() + } + + clientWg.Wait() + + collector.mu.Lock() + defer collector.mu.Unlock() + + registrations := collector.registrations + for i, registration := range registrations { + require.Equal(t, registration.requestData.Url, expected[i].requestData.Url) + require.Equal(t, registration.requestType, expected[i].requestType) + } +} + +type eventType int + +const ( + register eventType = 0 + unregister eventType = 1 +) + +type request struct { + requestType eventType + requestData *registry.NetworkServiceEndpoint +} + +type waitGroupServer struct { + wg *sync.WaitGroup +} + +func (s *waitGroupServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) { + s.wg.Done() + return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in) +} + +func (s *waitGroupServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server) +} + +func (s *waitGroupServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) { + s.wg.Done() + return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) +} + +type collectorServer struct { + mu sync.Mutex + registrations []request +} + +func (s *collectorServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) { + s.mu.Lock() + s.registrations = append(s.registrations, request{ + requestType: register, + requestData: in, + }) + s.mu.Unlock() + return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in) +} + +func (s *collectorServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server) +} + +func (s *collectorServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) { + s.mu.Lock() + s.registrations = append(s.registrations, request{ + requestType: unregister, + requestData: in, + }) + s.mu.Unlock() + return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) +} + +type delayServer struct { +} + +func (s *delayServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) { + n, _ := rand.Int(rand.Reader, big.NewInt(90)) + milliseconds := n.Int64() + 10 + time.Sleep(time.Millisecond * time.Duration(milliseconds)) + return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in) +} + +func (s *delayServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server) +} + +func (s *delayServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) { + n, _ := rand.Int(rand.Reader, big.NewInt(90)) + milliseconds := n.Int64() + 10 + time.Sleep(time.Millisecond * time.Duration(milliseconds)) + return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) +} diff --git a/pkg/registry/common/begin/nse_client.go b/pkg/registry/common/begin/nse_client.go index aad5d59e6..330bc808e 100644 --- a/pkg/registry/common/begin/nse_client.go +++ b/pkg/registry/common/begin/nse_client.go @@ -24,6 +24,7 @@ import ( "github.com/networkservicemesh/api/pkg/api/registry" "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" "github.com/networkservicemesh/sdk/pkg/registry/common/grpcmetadata" "github.com/networkservicemesh/sdk/pkg/registry/core/next" @@ -44,14 +45,8 @@ func (b *beginNSEClient) Register(ctx context.Context, in *registry.NetworkServi return next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, in, opts...) } eventFactoryClient, _ := b.LoadOrStore(id, - newEventNSEFactoryClient( - ctx, - func() { - b.Delete(id) - }, - opts..., - ), - ) + newEventNSEFactoryClient(ctx, func() { b.Delete(id) }, opts...)) + var resp *registry.NetworkServiceEndpoint var err error <-eventFactoryClient.executor.AsyncExec(func() { @@ -91,30 +86,26 @@ func (b *beginNSEClient) Unregister(ctx context.Context, in *registry.NetworkSer if fromContext(ctx) != nil { return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) } - eventFactoryClient, ok := b.Load(id) - if !ok { - return new(empty.Empty), nil - } - var emp *empty.Empty + eventFactoryClient, _ := b.LoadOrStore(id, newEventNSEFactoryClient(ctx, func() { b.Delete(id) })) var err error <-eventFactoryClient.executor.AsyncExec(func() { - // If the connection is not established, don't do anything - if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.registration == nil { - return - } - // If this isn't the connection we started with, do nothing currentEventFactoryClient, _ := b.Load(id) if currentEventFactoryClient != eventFactoryClient { + _, err = b.Unregister(ctx, in, opts...) return } + registration := eventFactoryClient.registration + if registration == nil { + registration = in.Clone() + } // Always close with the last valid Connection we got withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient) - emp, err = next.NetworkServiceEndpointRegistryClient(withEventFactoryCtx).Unregister(withEventFactoryCtx, eventFactoryClient.registration, opts...) + _, err = next.NetworkServiceEndpointRegistryClient(withEventFactoryCtx).Unregister(withEventFactoryCtx, registration, opts...) // afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories eventFactoryClient.afterCloseFunc() }) - return emp, err + return &emptypb.Empty{}, err } // NewNetworkServiceEndpointRegistryClient - returns a new null client that does nothing but call next.NetworkServiceEndpointRegistryClient(ctx). diff --git a/pkg/registry/common/begin/nse_server.go b/pkg/registry/common/begin/nse_server.go index 01bfd0cb9..35f14c96f 100644 --- a/pkg/registry/common/begin/nse_server.go +++ b/pkg/registry/common/begin/nse_server.go @@ -27,7 +27,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/common/grpcmetadata" "github.com/networkservicemesh/sdk/pkg/registry/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type beginNSEServer struct { @@ -43,14 +42,7 @@ func (b *beginNSEServer) Register(ctx context.Context, in *registry.NetworkServi if fromContext(ctx) != nil { return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in) } - eventFactoryServer, _ := b.LoadOrStore(id, - newNSEEventFactoryServer( - ctx, - func() { - b.Delete(id) - }, - ), - ) + eventFactoryServer, _ := b.LoadOrStore(id, newNSEEventFactoryServer(ctx, func() { b.Delete(id) })) var resp *registry.NetworkServiceEndpoint var err error @@ -58,7 +50,6 @@ func (b *beginNSEServer) Register(ctx context.Context, in *registry.NetworkServi <-eventFactoryServer.executor.AsyncExec(func() { currentEventFactoryServer, _ := b.Load(id) if currentEventFactoryServer != eventFactoryServer { - log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer") resp, err = b.Register(ctx, in) return } @@ -90,22 +81,22 @@ func (b *beginNSEServer) Unregister(ctx context.Context, in *registry.NetworkSer if fromContext(ctx) != nil { return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in) } - eventFactoryServer, ok := b.Load(id) - if !ok { - // If we don't have a connection to Close, just let it be - return &emptypb.Empty{}, nil - } + eventFactoryServer, _ := b.LoadOrStore(id, newNSEEventFactoryServer(ctx, func() { b.Delete(id) })) + var err error <-eventFactoryServer.executor.AsyncExec(func() { - if eventFactoryServer.state != established || eventFactoryServer.registration == nil { + currentEventFactoryServer, _ := b.Load(id) + if currentEventFactoryServer != eventFactoryServer { + _, err = b.Unregister(ctx, in) return } - currentServerClient, _ := b.Load(id) - if currentServerClient != eventFactoryServer { - return + + registration := eventFactoryServer.registration + if registration == nil { + registration = in.Clone() } withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer) - _, err = next.NetworkServiceEndpointRegistryServer(withEventFactoryCtx).Unregister(withEventFactoryCtx, eventFactoryServer.registration) + _, err = next.NetworkServiceEndpointRegistryServer(withEventFactoryCtx).Unregister(withEventFactoryCtx, registration) eventFactoryServer.afterCloseFunc() }) return &emptypb.Empty{}, err diff --git a/pkg/registry/common/dial/nse_client.go b/pkg/registry/common/dial/nse_client.go index fac726707..5864b5643 100644 --- a/pkg/registry/common/dial/nse_client.go +++ b/pkg/registry/common/dial/nse_client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 Cisco and/or its affiliates. +// Copyright (c) 2021-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/api/pkg/api/registry" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" "github.com/networkservicemesh/sdk/pkg/registry/common/clientconn" "github.com/networkservicemesh/sdk/pkg/registry/core/next" @@ -94,18 +95,23 @@ func (c *dialNSEClient) Unregister(ctx context.Context, in *registry.NetworkServ return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) } - cc, _ := clientconn.Load(ctx) + cc, loaded := clientconn.LoadOrStore(ctx, newDialer(c.chainCtx, c.dialTimeout, c.dialOptions...)) di, ok := cc.(*dialer) if !ok { return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) } + + err := di.Dial(ctx, clientURL) defer func() { _ = di.Close() clientconn.Delete(ctx) }() - _ = di.Dial(ctx, clientURL) + if err != nil && !loaded { + log.FromContext(ctx).Errorf("can not dial to %v, err %v", grpcutils.URLToTarget(clientURL), err) + return &emptypb.Empty{}, err + } return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...) }