From 6274111aeb39a99dd19ba760b5f223229cd78f8d Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sat, 21 Aug 2021 10:43:04 -0500 Subject: [PATCH 1/9] Introducing the begin chain element Please read pkg/networkservice/common/begin/doc.go for extensive documentation. Signed-off-by: Ed Warnicke --- pkg/networkservice/chains/client/client.go | 11 +- pkg/networkservice/chains/endpoint/server.go | 13 +- pkg/networkservice/common/begin/client.go | 112 +++++++++ .../client_map.gen.go} | 38 ++- .../common/begin/close_client_test.go | 119 ++++++++++ .../common/begin/close_server_test.go | 114 +++++++++ pkg/networkservice/common/begin/context.go | 48 ++++ pkg/networkservice/common/begin/doc.go | 101 ++++++++ .../common/begin/event_factory.go | 219 ++++++++++++++++++ .../common/{refresh/const.go => begin/gen.go} | 21 +- .../{refresh/gen.go => begin/options.go} | 20 +- .../common/begin/serialize_both_test.go | 63 +++++ .../common/begin/serialize_client_test.go | 96 ++++++++ .../serialize_server_test.go} | 45 +--- pkg/networkservice/common/begin/server.go | 102 ++++++++ .../common/begin/server_map.gen.go | 73 ++++++ pkg/networkservice/common/refresh/client.go | 107 ++++----- .../common/refresh/client_test.go | 25 +- pkg/networkservice/common/refresh/metadata.go | 70 ++++++ pkg/networkservice/common/serialize/README.md | 38 --- pkg/networkservice/common/serialize/client.go | 55 ----- pkg/networkservice/common/serialize/server.go | 55 ----- pkg/networkservice/common/timeout/metadata.go | 70 ++++++ pkg/networkservice/common/timeout/server.go | 77 +++--- .../common/timeout/server_test.go | 54 ++--- .../utils/inject/injectclock/client.go | 50 ++++ 26 files changed, 1409 insertions(+), 387 deletions(-) create mode 100644 pkg/networkservice/common/begin/client.go rename pkg/networkservice/common/{refresh/timer_map.gen.go => begin/client_map.gen.go} (56%) create mode 100644 pkg/networkservice/common/begin/close_client_test.go create mode 100644 pkg/networkservice/common/begin/close_server_test.go create mode 100644 pkg/networkservice/common/begin/context.go create mode 100644 pkg/networkservice/common/begin/doc.go create mode 100644 pkg/networkservice/common/begin/event_factory.go rename pkg/networkservice/common/{refresh/const.go => begin/gen.go} (56%) rename pkg/networkservice/common/{refresh/gen.go => begin/options.go} (61%) create mode 100644 pkg/networkservice/common/begin/serialize_both_test.go create mode 100644 pkg/networkservice/common/begin/serialize_client_test.go rename pkg/networkservice/common/{serialize/server_test.go => begin/serialize_server_test.go} (70%) create mode 100644 pkg/networkservice/common/begin/server.go create mode 100644 pkg/networkservice/common/begin/server_map.gen.go create mode 100644 pkg/networkservice/common/refresh/metadata.go delete mode 100644 pkg/networkservice/common/serialize/README.md delete mode 100644 pkg/networkservice/common/serialize/client.go delete mode 100644 pkg/networkservice/common/serialize/server.go create mode 100644 pkg/networkservice/common/timeout/metadata.go create mode 100644 pkg/networkservice/utils/inject/injectclock/client.go diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index a57527cb6..9e3cc56f4 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -27,6 +27,8 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" @@ -34,7 +36,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" @@ -106,9 +107,9 @@ func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) ne *rv = chain.NewNetworkServiceClient( updatepath.NewClient(opts.name), - serialize.NewClient(), - refresh.NewClient(ctx), + begin.NewClient(), metadata.NewClient(), + refresh.NewClient(ctx), adapters.NewServerToClient( chain.NewNetworkServiceServer( heal.NewServer(ctx, @@ -149,9 +150,9 @@ func NewClientFactory(clientOpts ...Option) connect.ClientFactory { append( append([]networkservice.NetworkServiceClient{ updatepath.NewClient(opts.name), - serialize.NewClient(), - refresh.NewClient(ctx), + begin.NewClient(), metadata.NewClient(), + refresh.NewClient(ctx), // TODO: move back to the end of the chain when `begin` chain element will be ready heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), }, opts.additionalFunctionality...), diff --git a/pkg/networkservice/chains/endpoint/server.go b/pkg/networkservice/chains/endpoint/server.go index f7d228016..8ff21d96f 100644 --- a/pkg/networkservice/chains/endpoint/server.go +++ b/pkg/networkservice/chains/endpoint/server.go @@ -25,13 +25,15 @@ import ( "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/google/uuid" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" @@ -104,14 +106,11 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options opts.name, append([]networkservice.NetworkServiceServer{ updatepath.NewServer(opts.name), - serialize.NewServer(), + begin.NewServer(), updatetoken.NewServer(tokenGenerator), opts.authorizeServer, - // `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so - // chain elements before the `timeout` in chain shouldn't make any updates to the Close context and - // shouldn't be closed on Connection Close. - timeout.NewServer(ctx), metadata.NewServer(), + timeout.NewServer(ctx), monitor.NewServer(ctx, &rv.MonitorConnectionServer), }, opts.additionalFunctionality...)...) return rv diff --git a/pkg/networkservice/common/begin/client.go b/pkg/networkservice/common/begin/client.go new file mode 100644 index 000000000..8f90790b7 --- /dev/null +++ b/pkg/networkservice/common/begin/client.go @@ -0,0 +1,112 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type beginClient struct { + clientMap +} + +// NewClient - creates a new begin chain element +func NewClient() networkservice.NetworkServiceClient { + return &beginClient{} +} + +func (b *beginClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { + // No connection.ID, no service + if request.GetConnection().GetId() == "" { + return nil, errors.New("request.EventFactory.Id must not be zero valued") + } + // If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally + if fromContext(ctx) != nil { + return next.Client(ctx).Request(ctx, request, opts...) + } + eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(), + newEventFactoryClient( + ctx, + func() { + b.Delete(request.GetRequestConnection().GetId()) + }, + opts..., + ), + ) + <-eventFactoryClient.executor.AsyncExec(func() { + // If the eventFactory has changed, usually because the connection has been Closed and re-established + // go back to the beginning and try again. + currentConnClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryClient) + if currentConnClient != eventFactoryClient { + conn, err = b.Request(ctx, request) + return + } + + ctx = withEventFactory(ctx, eventFactoryClient) + conn, err = next.Client(ctx).Request(ctx, request, opts...) + if err != nil { + if eventFactoryClient.state != established { + eventFactoryClient.state = closed + b.Delete(request.GetConnection().GetId()) + } + return + } + eventFactoryClient.request = request.Clone() + eventFactoryClient.request.Connection = conn.Clone() + eventFactoryClient.opts = opts + eventFactoryClient.state = established + }) + return conn, err +} + +func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (emp *emptypb.Empty, err error) { + // If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally + if fromContext(ctx) != nil { + return next.Client(ctx).Close(ctx, conn, opts...) + } + eventFactoryClient, ok := b.Load(conn.GetId()) + if !ok { + // If we don't have a connection to Close, just let it be + return + } + <-eventFactoryClient.executor.AsyncExec(func() { + // If the connection is not established, don't do anything + if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil { + return + } + + // If this isn't the connection we started with, do nothing + currentConnClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryClient) + if currentConnClient != eventFactoryClient { + return + } + // Always close with the last valid Connection we got + conn = eventFactoryClient.request.Connection + ctx = withEventFactory(ctx, eventFactoryClient) + emp, err = next.Client(ctx).Close(ctx, conn, opts...) + // afterClose() is used to cleanup things like the entry in the Map for EventFactories + eventFactoryClient.afterClose() + }) + return emp, err +} diff --git a/pkg/networkservice/common/refresh/timer_map.gen.go b/pkg/networkservice/common/begin/client_map.gen.go similarity index 56% rename from pkg/networkservice/common/refresh/timer_map.gen.go rename to pkg/networkservice/common/begin/client_map.gen.go index 51308aadd..0a036c54e 100644 --- a/pkg/networkservice/common/refresh/timer_map.gen.go +++ b/pkg/networkservice/common/begin/client_map.gen.go @@ -1,60 +1,58 @@ -// Code generated by "-output timer_map.gen.go -type timerMap -output timer_map.gen.go -type timerMap"; DO NOT EDIT. -package refresh +// Code generated by "-output client_map.gen.go -type clientMap -output client_map.gen.go -type clientMap"; DO NOT EDIT. +package begin import ( "sync" // Used by sync.Map. - - "github.com/networkservicemesh/sdk/pkg/tools/clock" ) // Generate code that will fail if the constants change value. func _() { - // An "cannot convert timerMap literal (type timerMap) to type sync.Map" compiler error signifies that the base type have changed. + // An "cannot convert clientMap literal (type clientMap) to type sync.Map" compiler error signifies that the base type have changed. // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(timerMap{}) + _ = (sync.Map)(clientMap{}) } -var _nil_timerMap_clock_Timer_value = func() (val clock.Timer) { return }() +var _nil_clientMap_eventFactoryClient_value = func() (val *eventFactoryClient) { return }() // Load returns the value stored in the map for a key, or nil if no // value is present. // The ok result indicates whether value was found in the map. -func (m *timerMap) Load(key string) (clock.Timer, bool) { +func (m *clientMap) Load(key string) (*eventFactoryClient, bool) { value, ok := (*sync.Map)(m).Load(key) if value == nil { - return _nil_timerMap_clock_Timer_value, ok + return _nil_clientMap_eventFactoryClient_value, ok } - return value.(clock.Timer), ok + return value.(*eventFactoryClient), ok } // Store sets the value for a key. -func (m *timerMap) Store(key string, value clock.Timer) { +func (m *clientMap) Store(key string, value *eventFactoryClient) { (*sync.Map)(m).Store(key, value) } // LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. -func (m *timerMap) LoadOrStore(key string, value clock.Timer) (clock.Timer, bool) { +func (m *clientMap) LoadOrStore(key string, value *eventFactoryClient) (*eventFactoryClient, bool) { actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) if actual == nil { - return _nil_timerMap_clock_Timer_value, loaded + return _nil_clientMap_eventFactoryClient_value, loaded } - return actual.(clock.Timer), loaded + return actual.(*eventFactoryClient), loaded } // LoadAndDelete deletes the value for a key, returning the previous value if any. // The loaded result reports whether the key was present. -func (m *timerMap) LoadAndDelete(key string) (value clock.Timer, loaded bool) { +func (m *clientMap) LoadAndDelete(key string) (value *eventFactoryClient, loaded bool) { actual, loaded := (*sync.Map)(m).LoadAndDelete(key) if actual == nil { - return _nil_timerMap_clock_Timer_value, loaded + return _nil_clientMap_eventFactoryClient_value, loaded } - return actual.(clock.Timer), loaded + return actual.(*eventFactoryClient), loaded } // Delete deletes the value for a key. -func (m *timerMap) Delete(key string) { +func (m *clientMap) Delete(key string) { (*sync.Map)(m).Delete(key) } @@ -68,8 +66,8 @@ func (m *timerMap) Delete(key string) { // // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. -func (m *timerMap) Range(f func(key string, value clock.Timer) bool) { +func (m *clientMap) Range(f func(key string, value *eventFactoryClient) bool) { (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(clock.Timer)) + return f(key.(string), value.(*eventFactoryClient)) }) } diff --git a/pkg/networkservice/common/begin/close_client_test.go b/pkg/networkservice/common/begin/close_client_test.go new file mode 100644 index 000000000..0b981d0d2 --- /dev/null +++ b/pkg/networkservice/common/begin/close_client_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "sync" + "testing" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +const ( + mark = "mark" +) + +func TestCloseClient(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + client := chain.NewNetworkServiceClient( + begin.NewClient(), + &markClient{t: t}, + ) + id := "1" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := client.Request(ctx, testRequest(id)) + assert.NotNil(t, t, conn) + assert.NoError(t, err) + assert.Equal(t, conn.GetContext().GetExtraContext()[mark], mark) + conn = conn.Clone() + delete(conn.GetContext().GetExtraContext(), mark) + assert.Zero(t, conn.GetContext().GetExtraContext()[mark]) + _, err = client.Close(ctx, conn) + assert.NoError(t, err) +} + +type markClient struct { + t *testing.T +} + +func (m *markClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + if request.GetConnection().GetContext() == nil { + request.GetConnection().Context = &networkservice.ConnectionContext{} + } + if request.GetConnection().GetContext().GetExtraContext() == nil { + request.GetConnection().GetContext().ExtraContext = make(map[string]string) + } + request.GetConnection().GetContext().GetExtraContext()[mark] = mark + return next.Client(ctx).Request(ctx, request, opts...) +} + +func (m *markClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + assert.NotNil(m.t, conn.GetContext().GetExtraContext()) + assert.Equal(m.t, mark, conn.GetContext().GetExtraContext()[mark]) + return next.Client(ctx).Close(ctx, conn, opts...) +} + +var _ networkservice.NetworkServiceClient = &markClient{} + +func TestDoubleCloseClient(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + client := chain.NewNetworkServiceClient( + begin.NewClient(), + &doubleCloseClient{t: t}, + ) + id := "1" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := client.Request(ctx, testRequest(id)) + assert.NotNil(t, t, conn) + assert.NoError(t, err) + conn = conn.Clone() + _, err = client.Close(ctx, conn) + assert.NoError(t, err) + _, err = client.Close(ctx, conn) + assert.NoError(t, err) +} + +type doubleCloseClient struct { + t *testing.T + sync.Once +} + +func (s *doubleCloseClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + return next.Client(ctx).Request(ctx, request, opts...) +} + +func (s *doubleCloseClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + count := 1 + s.Do(func() { + count++ + }) + assert.Equal(s.t, 2, count, "Close has been called more than once") + return next.Client(ctx).Close(ctx, conn, opts...) +} + +var _ networkservice.NetworkServiceClient = &doubleCloseClient{} diff --git a/pkg/networkservice/common/begin/close_server_test.go b/pkg/networkservice/common/begin/close_server_test.go new file mode 100644 index 000000000..57d5bb4e9 --- /dev/null +++ b/pkg/networkservice/common/begin/close_server_test.go @@ -0,0 +1,114 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "sync" + "testing" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +func TestCloseServer(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + server := chain.NewNetworkServiceServer( + begin.NewServer(), + &markServer{t: t}, + ) + id := "1" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := server.Request(ctx, testRequest(id)) + assert.NotNil(t, t, conn) + assert.NoError(t, err) + assert.Equal(t, conn.GetContext().GetExtraContext()[mark], mark) + conn = conn.Clone() + delete(conn.GetContext().GetExtraContext(), mark) + assert.Zero(t, conn.GetContext().GetExtraContext()[mark]) + _, err = server.Close(ctx, conn) + assert.NoError(t, err) +} + +type markServer struct { + t *testing.T +} + +func (m *markServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + if request.GetConnection().GetContext() == nil { + request.GetConnection().Context = &networkservice.ConnectionContext{} + } + if request.GetConnection().GetContext().GetExtraContext() == nil { + request.GetConnection().GetContext().ExtraContext = make(map[string]string) + } + request.GetConnection().GetContext().GetExtraContext()[mark] = mark + return next.Server(ctx).Request(ctx, request) +} + +func (m *markServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { + assert.NotNil(m.t, conn.GetContext().GetExtraContext()) + assert.Equal(m.t, mark, conn.GetContext().GetExtraContext()[mark]) + return next.Server(ctx).Close(ctx, conn) +} + +var _ networkservice.NetworkServiceServer = &markServer{} + +func TestDoubleCloseServer(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + server := chain.NewNetworkServiceServer( + begin.NewServer(), + &doubleCloseServer{t: t}, + ) + id := "1" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := server.Request(ctx, testRequest(id)) + assert.NotNil(t, t, conn) + assert.NoError(t, err) + conn = conn.Clone() + _, err = server.Close(ctx, conn) + assert.NoError(t, err) + _, err = server.Close(ctx, conn) + assert.NoError(t, err) +} + +type doubleCloseServer struct { + t *testing.T + sync.Once +} + +func (s *doubleCloseServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + return next.Server(ctx).Request(ctx, request) +} + +func (s *doubleCloseServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { + count := 1 + s.Do(func() { + count++ + }) + assert.Equal(s.t, 2, count, "Close has been called more than once") + return next.Server(ctx).Close(ctx, conn) +} + +var _ networkservice.NetworkServiceClient = &doubleCloseClient{} diff --git a/pkg/networkservice/common/begin/context.go b/pkg/networkservice/common/begin/context.go new file mode 100644 index 000000000..0a0ab42ff --- /dev/null +++ b/pkg/networkservice/common/begin/context.go @@ -0,0 +1,48 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin + +import ( + "context" +) + +type key struct{} + +func withEventFactory(parent context.Context, eventFactory EventFactory) context.Context { + if parent.Value(key{}) != nil { + return parent + } + ctx := context.WithValue(parent, key{}, eventFactory) + return ctx +} + +// FromContext - returns EventFactory from context +func FromContext(ctx context.Context) EventFactory { + value := fromContext(ctx) + if value == nil { + panic("EventFactory not found please add begin chain element to your chain") + } + return value +} + +func fromContext(ctx context.Context) EventFactory { + value, ok := ctx.Value(key{}).(EventFactory) + if ok { + return value + } + return nil +} diff --git a/pkg/networkservice/common/begin/doc.go b/pkg/networkservice/common/begin/doc.go new file mode 100644 index 000000000..fde651f1e --- /dev/null +++ b/pkg/networkservice/common/begin/doc.go @@ -0,0 +1,101 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package begin provides a chain element that can be put at the beginning of the chain, after Connection.Id has been set +but before any chain elements that would mutate the Connection on the return path. +the begin.New{Client,Server}() guarantee: + +Scope + +All Request() or Close() events are scoped to a particular Connection, uniquely identified by its Connection.Id + +Exclusivity + +Only one event is processed for a Connection.Id at a time + +Order + +Events for a given Connection.Id are processed in the order in which they are received + +Close Correctness + +When a Close(Connection) event is received, begin will replace the Connection provided with the last Connection +successfully returned from the chain for Connection.Id + +Midchain Originated Events + +A midchain element may originate a Request() or Close() event to be processed +from the beginning of the chain (Timeout, Refresh,Heal): + + errCh := begin.FromContext(ctx).Request() + errCh := begin.FromContext(ctx).Close() + +errCh will receive any error from the firing of the event, and will be closed after the event has fully +processed. + +Note: if a chain is a server chain continued by a client chain, the beginning of the chain is at the beginning of +the server chain, even if there is a subsequent begin.NewClient() in the client chain. + +Optionally you may use the CancelContext(context.Context) option: + + begin.FromContext(ctx).Request(CancelContext(cancelContext)) + begin.FromContext(ctx).Close(CancelContext(cancelContext)) + +If cancelContext is canceled prior to the processing of the event, the event processing will be skipped, +and the errCh returned simply closed. + +Midchain Originated Request Event + +Example: + + begin.FromContext(ctx).Request() + +will use the networkservice.NetworkServiceRequest from the chain's last successfully completed Request() event +with networkservice.NetworkServiceRequest.Connection replaced with the Connection returned by the chain's last +successfully completed Request() event + +Chain Placement + +begin.New{Server/Client} should always proceed any chain element which: +- Maintains state +- Mutates the Connection object along the return path of processing a Request() event. + +Reasoning + +networkservice.NetworkService{Client,Server} processes two kinds of events: + - Request() + - Close() +Each Request() or Close() event is scoped to a networkservice.Connection, which can be uniquely identified by its Connection.Id + +For a given Connection.Id, at most one event can be processed at a time (exclusivity). +For a given Connection.Id, events must be processed in the order they were received (order). +For Close(), the Connection passed to it must be identical to the last one returned by the chain to insure all state +is correctly cleared (close correctness). + +Typically, a chain element receives a Request() or Close() event from the element before it in the chain +and sends a Request() or Close() and either terminates processing returning an error, or sends a Request() or Close() +event to the next element in the chain. + +There are some circumstances in which a Request() or Close() event needs to be originated by a chain element +in the middle of the chain, but processed from the beginning of the chain. Examples include (but are not limited to): + - A server timing out an expired Connection + - A client refreshing a Connection so that it does not expire + - A client healing from a lost Connection +In all of these cases, the Request() or Close() event should be processed starting at the beginning of the chain, to ensure +that all of the proper side effects occur within the chain. +*/ +package begin diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go new file mode 100644 index 000000000..31e826650 --- /dev/null +++ b/pkg/networkservice/common/begin/event_factory.go @@ -0,0 +1,219 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin + +import ( + "context" + "time" + + "github.com/edwarnicke/serialize" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type connectionState int + +const ( + zero connectionState = iota + 1 + established + closed +) + +var _ connectionState = zero + +// EventFactory - allows firing off a Request or Close event from midchain +type EventFactory interface { + Request(opts ...Option) <-chan error + Close(opts ...Option) <-chan error +} + +type eventFactoryClient struct { + state connectionState + executor serialize.Executor + ctxFunc func() (context.Context, context.CancelFunc) + request *networkservice.NetworkServiceRequest + opts []grpc.CallOption + client networkservice.NetworkServiceClient + afterClose func() +} + +func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.CallOption) *eventFactoryClient { + f := &eventFactoryClient{ + client: next.Client(ctx), + opts: opts, + } + f.ctxFunc = cxtFuncFromContext(ctx, f) + + f.afterClose = func() { + f.state = closed + if afterClose != nil { + afterClose() + } + } + return f +} + +func (f *eventFactoryClient) Request(opts ...Option) <-chan error { + o := &option{ + cancelCtx: context.Background(), + } + for _, opt := range opts { + opt(o) + } + ch := make(chan error, 1) + f.executor.AsyncExec(func() { + defer close(ch) + if f.state != established || f.client == nil || f.request == nil { + return + } + select { + case <-o.cancelCtx.Done(): + default: + ctx, cancel := f.ctxFunc() + defer cancel() + conn, err := f.client.Request(ctx, f.request, f.opts...) + if err == nil && f.request != nil { + f.request.Connection = conn + } + ch <- err + } + }) + return ch +} + +func (f *eventFactoryClient) Close(opts ...Option) <-chan error { + o := &option{ + cancelCtx: context.Background(), + } + for _, opt := range opts { + opt(o) + } + ch := make(chan error, 1) + f.executor.AsyncExec(func() { + defer close(ch) + if f.state != established || f.client == nil || f.request == nil { + return + } + select { + case <-o.cancelCtx.Done(): + default: + ctx, cancel := f.ctxFunc() + defer cancel() + _, err := f.client.Close(ctx, f.request.GetConnection(), f.opts...) + f.afterClose() + ch <- err + } + }) + return ch +} + +var _ EventFactory = &eventFactoryClient{} + +type eventFactoryServer struct { + state connectionState + executor serialize.Executor + ctxFunc func() (context.Context, context.CancelFunc) + request *networkservice.NetworkServiceRequest + afterClose func() + server networkservice.NetworkServiceServer +} + +func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer { + f := &eventFactoryServer{ + server: next.Server(ctx), + } + f.ctxFunc = cxtFuncFromContext(ctx, f) + + f.afterClose = func() { + f.state = closed + afterClose() + } + return f +} + +func (f *eventFactoryServer) Request(opts ...Option) <-chan error { + o := &option{ + cancelCtx: context.Background(), + } + for _, opt := range opts { + opt(o) + } + ch := make(chan error, 1) + f.executor.AsyncExec(func() { + defer close(ch) + if f.state != established || f.server == nil || f.request == nil { + return + } + select { + case <-o.cancelCtx.Done(): + default: + ctx, cancel := f.ctxFunc() + defer cancel() + conn, err := f.server.Request(ctx, f.request) + if err == nil && f.request != nil { + f.request.Connection = conn + } + ch <- err + } + }) + return ch +} + +func (f *eventFactoryServer) Close(opts ...Option) <-chan error { + o := &option{ + cancelCtx: context.Background(), + } + for _, opt := range opts { + opt(o) + } + ch := make(chan error, 1) + <-f.executor.AsyncExec(func() { + defer close(ch) + if f.state != established || f.server == nil || f.request == nil { + return + } + select { + case <-o.cancelCtx.Done(): + default: + ctx, cancel := f.ctxFunc() + defer cancel() + _, err := f.server.Close(ctx, f.request.GetConnection()) + ch <- err + } + }) + return ch +} + +var _ EventFactory = &eventFactoryServer{} + +func cxtFuncFromContext(ctx context.Context, eventFactory EventFactory) func() (context.Context, context.CancelFunc) { + ctxFunc := func() (context.Context, context.CancelFunc) { + eventCtx, cancel := context.WithCancel(context.Background()) + return withEventFactory(eventCtx, eventFactory), cancel + } + if deadline, ok := ctx.Deadline(); ok { + duration := time.Until(deadline) + ctxFunc = func() (context.Context, context.CancelFunc) { + eventContext, cancel := context.WithTimeout(context.Background(), duration) + eventContext = withEventFactory(eventContext, eventFactory) + return eventContext, cancel + } + } + return ctxFunc +} diff --git a/pkg/networkservice/common/refresh/const.go b/pkg/networkservice/common/begin/gen.go similarity index 56% rename from pkg/networkservice/common/refresh/const.go rename to pkg/networkservice/common/begin/gen.go index 410906687..cc32d7669 100644 --- a/pkg/networkservice/common/refresh/const.go +++ b/pkg/networkservice/common/begin/gen.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -14,11 +14,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package refresh +package begin -import "time" - -const ( - // defaultRefreshRequestTimeout - default timeout for refresh request - defaultRefreshRequestTimeout = time.Minute +import ( + "sync" ) + +//go:generate go-syncmap -output client_map.gen.go -type clientMap + +// clientMap - sync.Map with key == string and value == *eventFactoryClient +type clientMap sync.Map + +//go:generate go-syncmap -output server_map.gen.go -type serverMap + +// serverMap - sync.Map with key == string and value == *eventFactoryServer +type serverMap sync.Map diff --git a/pkg/networkservice/common/refresh/gen.go b/pkg/networkservice/common/begin/options.go similarity index 61% rename from pkg/networkservice/common/refresh/gen.go rename to pkg/networkservice/common/begin/options.go index dbdaf2e49..d4a8d7a7a 100644 --- a/pkg/networkservice/common/refresh/gen.go +++ b/pkg/networkservice/common/begin/options.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -14,12 +14,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package refresh +package begin import ( - "sync" + "context" ) -//go:generate go-syncmap -output timer_map.gen.go -type timerMap +type option struct { + cancelCtx context.Context +} -type timerMap sync.Map +// Option - event option +type Option func(*option) + +// CancelContext - optionally provide a context that, when canceled will preclude the event from running +func CancelContext(cancelCtx context.Context) Option { + return func(o *option) { + o.cancelCtx = cancelCtx + } +} diff --git a/pkg/networkservice/common/begin/serialize_both_test.go b/pkg/networkservice/common/begin/serialize_both_test.go new file mode 100644 index 000000000..2850d2054 --- /dev/null +++ b/pkg/networkservice/common/begin/serialize_both_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" +) + +func TestSerializeBoth_StressTest(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + server := chain.NewNetworkServiceServer( + begin.NewServer(), + newParallelServer(t), + adapters.NewClientToServer(chain.NewNetworkServiceClient( + begin.NewClient(), + newParallelClient(t), + ), + ), + ) + + wg := new(sync.WaitGroup) + wg.Add(parallelCount) + for i := 0; i < parallelCount; i++ { + go func(id string) { + defer wg.Done() + + conn, err := server.Request(ctx, testRequest(id)) + assert.NoError(t, err) + + _, err = server.Close(ctx, conn) + assert.NoError(t, err) + }(fmt.Sprint(i % 20)) + } + wg.Wait() +} diff --git a/pkg/networkservice/common/begin/serialize_client_test.go b/pkg/networkservice/common/begin/serialize_client_test.go new file mode 100644 index 000000000..8e16d8d4d --- /dev/null +++ b/pkg/networkservice/common/begin/serialize_client_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin_test + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +func TestSerializeClient_StressTest(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := chain.NewNetworkServiceClient( + begin.NewClient(), + newParallelClient(t), + ) + + wg := new(sync.WaitGroup) + wg.Add(parallelCount) + for i := 0; i < parallelCount; i++ { + go func(id string) { + defer wg.Done() + + conn, err := client.Request(ctx, testRequest(id)) + assert.NoError(t, err) + + _, err = client.Close(ctx, conn) + assert.NoError(t, err) + }(fmt.Sprint(i % 20)) + } + wg.Wait() +} + +type parallelClient struct { + t *testing.T + states sync.Map +} + +func newParallelClient(t *testing.T) *parallelClient { + return ¶llelClient{ + t: t, + } +} + +func (s *parallelClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + raw, _ := s.states.LoadOrStore(request.Connection.Id, new(int32)) + statePtr := raw.(*int32) + + state := atomic.LoadInt32(statePtr) + if !atomic.CompareAndSwapInt32(statePtr, state, state+1) { + assert.Failf(s.t, "", "state has been changed for connection %s expected %d actual %d", request.GetConnection().GetId(), state, atomic.LoadInt32(statePtr)) + } + + return next.Client(ctx).Request(ctx, request, opts...) +} + +func (s *parallelClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + raw, _ := s.states.LoadOrStore(conn.Id, new(int32)) + statePtr := raw.(*int32) + + state := atomic.LoadInt32(statePtr) + if !atomic.CompareAndSwapInt32(statePtr, state, state+1) { + assert.Failf(s.t, "", "state has been changed for connection %s expected %d actual %d", conn.GetId(), state, atomic.LoadInt32(statePtr)) + } + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/serialize/server_test.go b/pkg/networkservice/common/begin/serialize_server_test.go similarity index 70% rename from pkg/networkservice/common/serialize/server_test.go rename to pkg/networkservice/common/begin/serialize_server_test.go index 7a59242f7..693e83af8 100644 --- a/pkg/networkservice/common/serialize/server_test.go +++ b/pkg/networkservice/common/begin/serialize_server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package serialize_test +package begin_test import ( "context" @@ -24,15 +24,13 @@ import ( "testing" "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/stretchr/testify/assert" "go.uber.org/goleak" - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/serializectx" ) const ( @@ -54,8 +52,7 @@ func TestSerializeServer_StressTest(t *testing.T) { defer cancel() server := chain.NewNetworkServiceServer( - serialize.NewServer(), - new(eventServer), + begin.NewServer(), newParallelServer(t), ) @@ -75,34 +72,6 @@ func TestSerializeServer_StressTest(t *testing.T) { wg.Wait() } -type eventServer struct{} - -func (s *eventServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - executor := serializectx.GetExecutor(ctx, request.GetConnection().GetId()) - go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Request(serializectx.WithExecutor(context.TODO(), executor), request) - }) - }() - - conn, err := next.Server(ctx).Request(ctx, request) - if err != nil { - return nil, err - } - - go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Close(context.TODO(), conn) - }) - }() - - return conn, nil -} - -func (s *eventServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - return next.Server(ctx).Close(ctx, conn) -} - type parallelServer struct { t *testing.T states sync.Map @@ -119,7 +88,7 @@ func (s *parallelServer) Request(ctx context.Context, request *networkservice.Ne statePtr := raw.(*int32) state := atomic.LoadInt32(statePtr) - assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed") + assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed for connection %s expected %d actual %d", request.GetConnection().GetId(), state, atomic.LoadInt32(statePtr)) return next.Server(ctx).Request(ctx, request) } @@ -129,7 +98,7 @@ func (s *parallelServer) Close(ctx context.Context, conn *networkservice.Connect statePtr := raw.(*int32) state := atomic.LoadInt32(statePtr) - assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed") + assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed for connection %s expected %d actual %d", conn.GetId(), state, atomic.LoadInt32(statePtr)) return next.Server(ctx).Close(ctx, conn) } diff --git a/pkg/networkservice/common/begin/server.go b/pkg/networkservice/common/begin/server.go new file mode 100644 index 000000000..181a3433f --- /dev/null +++ b/pkg/networkservice/common/begin/server.go @@ -0,0 +1,102 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package begin + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type beginServer struct { + serverMap +} + +// NewServer - creates a new begin chain element +func NewServer() networkservice.NetworkServiceServer { + return &beginServer{} +} + +func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { + // No connection.ID, no service + if request.GetConnection().GetId() == "" { + return nil, errors.New("request.EventFactory.Id must not be zero valued") + } + // If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally + if fromContext(ctx) != nil { + return next.Server(ctx).Request(ctx, request) + } + eventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(), + newEventFactoryServer( + ctx, + func() { + b.Delete(request.GetRequestConnection().GetId()) + }, + ), + ) + <-eventFactoryServer.executor.AsyncExec(func() { + currentServerClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryServer) + if currentServerClient != eventFactoryServer { + conn, err = b.Request(ctx, request) + return + } + ctx = withEventFactory(ctx, eventFactoryServer) + conn, err = next.Server(ctx).Request(ctx, request) + if err != nil { + if eventFactoryServer.state != established { + eventFactoryServer.state = closed + b.Delete(request.GetConnection().GetId()) + } + return + } + eventFactoryServer.request = request.Clone() + eventFactoryServer.request.Connection = conn.Clone() + eventFactoryServer.state = established + }) + return conn, err +} + +func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (emp *emptypb.Empty, err error) { + // If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally + if fromContext(ctx) != nil { + return next.Server(ctx).Close(ctx, conn) + } + eventFactoryServer, ok := b.Load(conn.GetId()) + if !ok { + // If we don't have a connection to Close, just let it be + return + } + <-eventFactoryServer.executor.AsyncExec(func() { + if eventFactoryServer.state != established || eventFactoryServer.request == nil { + return + } + currentServerClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryServer) + if currentServerClient != eventFactoryServer { + return + } + // Always close with the last valid EventFactory we got + conn = eventFactoryServer.request.Connection + ctx = withEventFactory(ctx, eventFactoryServer) + emp, err = next.Server(ctx).Close(ctx, conn) + eventFactoryServer.afterClose() + }) + return emp, err +} diff --git a/pkg/networkservice/common/begin/server_map.gen.go b/pkg/networkservice/common/begin/server_map.gen.go new file mode 100644 index 000000000..3eea2f62b --- /dev/null +++ b/pkg/networkservice/common/begin/server_map.gen.go @@ -0,0 +1,73 @@ +// Code generated by "-output server_map.gen.go -type serverMap -output server_map.gen.go -type serverMap"; DO NOT EDIT. +package begin + +import ( + "sync" // Used by sync.Map. +) + +// Generate code that will fail if the constants change value. +func _() { + // An "cannot convert serverMap literal (type serverMap) to type sync.Map" compiler error signifies that the base type have changed. + // Re-run the go-syncmap command to generate them again. + _ = (sync.Map)(serverMap{}) +} + +var _nil_serverMap_eventFactoryServer_value = func() (val *eventFactoryServer) { return }() + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *serverMap) Load(key string) (*eventFactoryServer, bool) { + value, ok := (*sync.Map)(m).Load(key) + if value == nil { + return _nil_serverMap_eventFactoryServer_value, ok + } + return value.(*eventFactoryServer), ok +} + +// Store sets the value for a key. +func (m *serverMap) Store(key string, value *eventFactoryServer) { + (*sync.Map)(m).Store(key, value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *serverMap) LoadOrStore(key string, value *eventFactoryServer) (*eventFactoryServer, bool) { + actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) + if actual == nil { + return _nil_serverMap_eventFactoryServer_value, loaded + } + return actual.(*eventFactoryServer), loaded +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *serverMap) LoadAndDelete(key string) (value *eventFactoryServer, loaded bool) { + actual, loaded := (*sync.Map)(m).LoadAndDelete(key) + if actual == nil { + return _nil_serverMap_eventFactoryServer_value, loaded + } + return actual.(*eventFactoryServer), loaded +} + +// Delete deletes the value for a key. +func (m *serverMap) Delete(key string) { + (*sync.Map)(m).Delete(key) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *serverMap) Range(f func(key string, value *eventFactoryServer) bool) { + (*sync.Map)(m).Range(func(key, value interface{}) bool { + return f(key.(string), value.(*eventFactoryServer)) + }) +} diff --git a/pkg/networkservice/common/refresh/client.go b/pkg/networkservice/common/refresh/client.go index 78ba07d09..47bb7b9c0 100644 --- a/pkg/networkservice/common/refresh/client.go +++ b/pkg/networkservice/common/refresh/client.go @@ -29,17 +29,18 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/extend" - "github.com/networkservicemesh/sdk/pkg/tools/serializectx" ) type refreshClient struct { chainCtx context.Context - timers timerMap } // NewClient - creates new NetworkServiceClient chain element for refreshing @@ -51,92 +52,72 @@ func NewClient(ctx context.Context) networkservice.NetworkServiceClient { } func (t *refreshClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - connectionID := request.Connection.Id - t.stopTimer(connectionID) - conn, err := next.Client(ctx).Request(ctx, request, opts...) if err != nil { return nil, err } - executor := serializectx.GetExecutor(ctx, connectionID) - if executor == nil { - return nil, errors.New("no executor provided") + // Compute refreshAfter + refreshAfter, err := after(ctx, conn) + if err != nil { + // If we can't refresh, we should close down chain + _, _ = t.Close(ctx, conn) + return nil, err + } + + // Create a cancel context. + cancelCtx, cancel := context.WithCancel(t.chainCtx) + for { + // Call the old cancel to cancel any existing refreshes hanging out waiting to go + if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(t)); loaded { + oldCancel() + } + // Store the cancel context and break out of the loop + if _, loaded := LoadOrStore(ctx, metadata.IsClient(t), cancel); !loaded { + break + } } - request.Connection = conn.Clone() - t.startTimer(ctx, connectionID, request, opts) + eventFactory := begin.FromContext(ctx) + timeClock := clock.FromContext(ctx) + // Create the afterCh *outside* the go routine. This must be done to avoid picking up a later 'now' + // from mockClock in testing + afterCh := timeClock.After(refreshAfter) + go func(cancelCtx context.Context, afterCh <-chan time.Time) { + select { + case <-cancelCtx.Done(): + case <-afterCh: + eventFactory.Request(begin.CancelContext(cancelCtx)) + } + }(cancelCtx, afterCh) return conn, nil } func (t *refreshClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (e *empty.Empty, err error) { - t.stopTimer(conn.Id) - return next.Client(ctx).Close(ctx, conn, opts...) -} - -func (t *refreshClient) stopTimer(connectionID string) { - value, loaded := t.timers.LoadAndDelete(connectionID) - if loaded { - value.Stop() + if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(t)); loaded { + oldCancel() } + return next.Client(ctx).Close(ctx, conn, opts...) } -func (t *refreshClient) startTimer(ctx context.Context, connectionID string, request *networkservice.NetworkServiceRequest, opts []grpc.CallOption) { +func after(ctx context.Context, conn *networkservice.Connection) (time.Duration, error) { clockTime := clock.FromContext(ctx) - - nextClient := next.Client(ctx) - expireTime, err := ptypes.Timestamp(request.GetConnection().GetCurrentPathSegment().GetExpires()) + expireTime, err := ptypes.Timestamp(conn.GetCurrentPathSegment().GetExpires()) if err != nil { - return + return 0, errors.WithStack(err) } + log.FromContext(ctx).Infof("expireTime %s now %s", expireTime, clockTime.Now().UTC()) // A heuristic to reduce the number of redundant requests in a chain // made of refreshing clients with the same expiration time: let outer // chain elements refresh slightly faster than inner ones. // Update interval is within 0.2*expirationTime .. 0.4*expirationTime scale := 1. / 3. - path := request.GetConnection().GetPath() + path := conn.GetPath() if len(path.PathSegments) > 1 { scale = 0.2 + 0.2*float64(path.Index)/float64(len(path.PathSegments)) } duration := time.Duration(float64(clockTime.Until(expireTime)) * scale) - req := request.Clone() - exec := serializectx.GetExecutor(ctx, connectionID) - - var timer clock.Timer - timer = clockTime.AfterFunc(duration, func() { - exec.AsyncExec(func() { - oldTimer, ok := t.timers.Load(connectionID) - if !ok || oldTimer != timer { - return - } - - t.timers.Delete(connectionID) - - // Context is canceled or deadlined. - if t.chainCtx.Err() != nil { - return - } - - timeout := defaultRefreshRequestTimeout - if timeout > duration { - timeout = duration - } - - refreshCtx, cancel := clockTime.WithTimeout(extend.WithValuesFromContext(t.chainCtx, ctx), timeout) - defer cancel() - - conn, err := nextClient.Request(refreshCtx, req, opts...) - if err != nil { - return - } - - req.Connection = conn.Clone() - t.startTimer(ctx, connectionID, req, opts) - }) - }) - - t.stopTimer(connectionID) - t.timers.Store(connectionID, timer) + return duration, nil } diff --git a/pkg/networkservice/common/refresh/client_test.go b/pkg/networkservice/common/refresh/client_test.go index a4a8b0e65..b6d1efe37 100644 --- a/pkg/networkservice/common/refresh/client_test.go +++ b/pkg/networkservice/common/refresh/client_test.go @@ -29,14 +29,16 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectclock" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/clock" "github.com/networkservicemesh/sdk/pkg/tools/clockmock" "github.com/networkservicemesh/sdk/pkg/tools/sandbox" @@ -63,12 +65,15 @@ func testTokenFunc(clockTime clock.Clock) token.GeneratorFunc { func testClient( ctx context.Context, tokenGenerator token.GeneratorFunc, + clk clock.Clock, additionalFunctionality ...networkservice.NetworkServiceClient, ) networkservice.NetworkServiceClient { return next.NewNetworkServiceClient( append([]networkservice.NetworkServiceClient{ - serialize.NewClient(), updatepath.NewClient("refresh"), + begin.NewClient(), + metadata.NewClient(), + injectclock.NewClient(clk), refresh.NewClient(ctx), adapters.NewServerToClient( updatetoken.NewServer(tokenGenerator), @@ -85,12 +90,11 @@ func TestRefreshClient_ValidRefresh(t *testing.T) { defer cancel() clockMock := clockmock.New(ctx) - ctx = clock.WithClock(ctx, clockMock) cloneClient := &countClient{ t: t, } - client := testClient(ctx, testTokenFunc(clockMock), cloneClient) + client := testClient(ctx, testTokenFunc(clockMock), clockMock, cloneClient) conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{ @@ -123,13 +127,14 @@ func TestRefreshClient_StopRefreshAtClose(t *testing.T) { defer cancel() clockMock := clockmock.New(ctx) - ctx = clock.WithClock(ctx, clockMock) cloneClient := &countClient{ t: t, } client := chain.NewNetworkServiceClient( - serialize.NewClient(), + begin.NewClient(), + metadata.NewClient(), + injectclock.NewClient(clockMock), updatepath.NewClient("refresh"), refresh.NewClient(ctx), adapters.NewServerToClient(updatetoken.NewServer(testTokenFunc(clockMock))), @@ -168,7 +173,7 @@ func TestRefreshClient_RestartsRefreshAtAnotherRequest(t *testing.T) { cloneClient := &countClient{ t: t, } - client := testClient(ctx, testTokenFunc(clockMock), cloneClient) + client := testClient(ctx, testTokenFunc(clockMock), clockMock, cloneClient) conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{ @@ -220,7 +225,7 @@ func TestRefreshClient_CheckRaceConditions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := testClient(ctx, sandbox.GenerateExpiringToken(conf.expireTimeout), adapters.NewServerToClient(refreshTester)) + client := testClient(ctx, sandbox.GenerateExpiringToken(conf.expireTimeout), clock.FromContext(ctx), adapters.NewServerToClient(refreshTester)) generateRequests(t, client, refreshTester, conf.iterations, conf.tickDuration) } @@ -277,12 +282,12 @@ func TestRefreshClient_NoRefreshOnFailure(t *testing.T) { defer cancel() clockMock := clockmock.New(ctx) - ctx = clock.WithClock(ctx, clockMock) cloneClient := &countClient{ t: t, } client := testClient(ctx, testTokenFunc(clockMock), + clockMock, cloneClient, injecterror.NewClient(), ) @@ -304,12 +309,12 @@ func TestRefreshClient_NoRefreshOnRefreshFailure(t *testing.T) { defer cancel() clockMock := clockmock.New(ctx) - ctx = clock.WithClock(ctx, clockMock) cloneClient := &countClient{ t: t, } client := testClient(ctx, testTokenFunc(clockMock), + clockMock, cloneClient, injecterror.NewClient(injecterror.WithRequestErrorTimes(1, -1)), ) diff --git a/pkg/networkservice/common/refresh/metadata.go b/pkg/networkservice/common/refresh/metadata.go new file mode 100644 index 000000000..2ec5693a8 --- /dev/null +++ b/pkg/networkservice/common/refresh/metadata.go @@ -0,0 +1,70 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package refresh + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type key struct{} + +// Store sets the context.CancelFunc stored in per Connection.Id metadata. +func Store(ctx context.Context, isClient bool, cancel context.CancelFunc) { + metadata.Map(ctx, isClient).Store(key{}, cancel) +} + +// Delete deletes the context.CancelFunc stored in per Connection.Id metadata +func Delete(ctx context.Context, isClient bool) { + metadata.Map(ctx, isClient).Delete(key{}) +} + +// Load returns the context.CancelFunc stored in per Connection.Id metadata, or nil if no +// value is present. +// The ok result indicates whether value was found in the per Connection.Id metadata. +func Load(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} + +// LoadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. +// Otherwise, it stores and returns the given nterface_types.InterfaceIndex. +// The loaded result is true if the value was loaded, false if stored. +func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} + +// LoadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func LoadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} diff --git a/pkg/networkservice/common/serialize/README.md b/pkg/networkservice/common/serialize/README.md deleted file mode 100644 index 5591ff437..000000000 --- a/pkg/networkservice/common/serialize/README.md +++ /dev/null @@ -1,38 +0,0 @@ -# Functional requirements - -`Request`, `Close` events for the same `Connection.ID` should be executed in network service chain serially. - -# Implementation - -## serializeServer, serializeClient - -Serialize chain elements uses [multi executor](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/multiexecutor/multi_executor.go) -and stores per-id executor in the `Request` context.\ -**NOTE:** we don't pass executor to the `Close` context because it is very strange to plan events on already closed -connection. Please don't do it. - -Correct event firing chain element example: -```go -func (s *eventServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - executor := serializectx.Executor(ctx, request.GetConnection().GetId()) - go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Request(serializectx.WithExecutor(context.TODO(), executor), request) - }) - }() - - conn, err := next.Server(ctx).Request(ctx, request) - if err != nil { - return nil, err - } - - go func() { - executor.AsyncExec(func() { - // We don't pass executor to the Close context. Please don't do it. - _, _ = next.Server(ctx).Close(context.TODO(), conn) - }) - }() - - return conn, nil -} -``` diff --git a/pkg/networkservice/common/serialize/client.go b/pkg/networkservice/common/serialize/client.go deleted file mode 100644 index dd7e6d778..000000000 --- a/pkg/networkservice/common/serialize/client.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package serialize - -import ( - "context" - - "github.com/golang/protobuf/ptypes/empty" - "google.golang.org/grpc" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" - "github.com/networkservicemesh/sdk/pkg/tools/serializectx" -) - -type serializeClient struct { - executor multiexecutor.MultiExecutor -} - -// NewClient returns a new serialize client chain element -func NewClient() networkservice.NetworkServiceClient { - return new(serializeClient) -} - -func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { - connID := request.GetConnection().GetId() - <-c.executor.AsyncExec(connID, func() { - requestCtx := serializectx.WithMultiExecutor(ctx, &c.executor) - conn, err = next.Client(ctx).Request(requestCtx, request, opts...) - }) - return conn, err -} - -func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) { - <-c.executor.AsyncExec(conn.GetId(), func() { - _, err = next.Client(ctx).Close(ctx, conn, opts...) - }) - return new(empty.Empty), err -} diff --git a/pkg/networkservice/common/serialize/server.go b/pkg/networkservice/common/serialize/server.go deleted file mode 100644 index d1bec5479..000000000 --- a/pkg/networkservice/common/serialize/server.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package serialize provides chain elements for serial Request, Close event processing -package serialize - -import ( - "context" - - "github.com/golang/protobuf/ptypes/empty" - - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" - "github.com/networkservicemesh/sdk/pkg/tools/serializectx" -) - -type serializeServer struct { - executor multiexecutor.MultiExecutor -} - -// NewServer returns a new serialize server chain element -func NewServer() networkservice.NetworkServiceServer { - return new(serializeServer) -} - -func (s *serializeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { - connID := request.GetConnection().GetId() - <-s.executor.AsyncExec(connID, func() { - requestCtx := serializectx.WithMultiExecutor(ctx, &s.executor) - conn, err = next.Server(ctx).Request(requestCtx, request) - }) - return conn, err -} - -func (s *serializeServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) { - <-s.executor.AsyncExec(conn.GetId(), func() { - _, err = next.Server(ctx).Close(ctx, conn) - }) - return new(empty.Empty), err -} diff --git a/pkg/networkservice/common/timeout/metadata.go b/pkg/networkservice/common/timeout/metadata.go new file mode 100644 index 000000000..0f602be41 --- /dev/null +++ b/pkg/networkservice/common/timeout/metadata.go @@ -0,0 +1,70 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timeout + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type key struct{} + +// Store sets the context.CancelFunc stored in per Connection.Id metadata. +func Store(ctx context.Context, isClient bool, cancel context.CancelFunc) { + metadata.Map(ctx, isClient).Store(key{}, cancel) +} + +// Delete deletes the context.CancelFunc stored in per Connection.Id metadata +func Delete(ctx context.Context, isClient bool) { + metadata.Map(ctx, isClient).Delete(key{}) +} + +// Load returns the context.CancelFunc stored in per Connection.Id metadata, or nil if no +// value is present. +// The ok result indicates whether value was found in the per Connection.Id metadata. +func Load(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} + +// LoadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. +// Otherwise, it stores and returns the given nterface_types.InterfaceIndex. +// The loaded result is true if the value was loaded, false if stored. +func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} + +// LoadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func LoadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index 52d716f1c..70a65f84c 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -21,80 +21,69 @@ package timeout import ( "context" + "time" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/expire" - "github.com/networkservicemesh/sdk/pkg/tools/log" - "github.com/networkservicemesh/sdk/pkg/tools/serializectx" ) type timeoutServer struct { - expireManager *expire.Manager + chainCtx context.Context } // NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections // for the subsequent chain elements. func NewServer(ctx context.Context) networkservice.NetworkServiceServer { return &timeoutServer{ - expireManager: expire.NewManager(ctx), + chainCtx: ctx, } } func (s *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { - logger := log.FromContext(ctx).WithField("timeoutServer", "Request") - - conn = request.GetConnection() - connID := conn.GetId() - - expirationTimestamp := conn.GetPrevPathSegment().GetExpires() - if expirationTimestamp == nil { - return nil, errors.Errorf("expiration for the previous path segment cannot be nil: %+v", conn) - } - - s.expireManager.Stop(connID) - conn, err = next.Server(ctx).Request(ctx, request) if err != nil { - s.expireManager.Start(connID) return nil, err } - closeConn := conn.Clone() - if closeConn.GetId() != connID { - s.expireManager.Delete(connID) + expirationTimestamp := conn.GetPrevPathSegment().GetExpires() + if expirationTimestamp == nil { + return nil, errors.Errorf("expiration for the previous path segment cannot be nil: %+v", conn) } - - s.expireManager.New( - serializectx.GetExecutor(ctx, closeConn.GetId()), - closeConn.GetId(), - expirationTimestamp.AsTime().Local(), - func(closeCtx context.Context) { - if _, closeErr := next.Server(ctx).Close(closeCtx, closeConn); closeErr != nil { - logger.Errorf("failed to close timed out connection: %s %s", closeConn.GetId(), closeErr.Error()) - } - }, - ) + expirationTime := expirationTimestamp.AsTime() + cancelCtx, cancel := context.WithCancel(s.chainCtx) + for { + if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { + oldCancel() + } + if _, loaded := LoadOrStore(ctx, metadata.IsClient(s), cancel); !loaded { + break + } + } + eventFactory := begin.FromContext(ctx) + timeClock := clock.FromContext(ctx) + afterCh := timeClock.After(timeClock.Until(expirationTime)) + go func(cancelCtx context.Context, afterCh <-chan time.Time) { + select { + case <-cancelCtx.Done(): + case <-afterCh: + eventFactory.Close(begin.CancelContext(cancelCtx)) + } + }(cancelCtx, afterCh) return conn, nil } func (s *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - logger := log.FromContext(ctx).WithField("timeoutServer", "Close") - - if s.expireManager.Stop(conn.GetId()) { - if _, err := next.Server(ctx).Close(ctx, conn); err != nil { - s.expireManager.Start(conn.GetId()) - return nil, err - } - s.expireManager.Delete(conn.GetId()) - } else { - logger.Warnf("connection has been already closed: %s", conn.GetId()) + if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { + oldCancel() } - - return new(empty.Empty), nil + return next.Server(ctx).Close(ctx, conn) } diff --git a/pkg/networkservice/common/timeout/server_test.go b/pkg/networkservice/common/timeout/server_test.go index d317383c3..3d738419c 100644 --- a/pkg/networkservice/common/timeout/server_test.go +++ b/pkg/networkservice/common/timeout/server_test.go @@ -31,17 +31,19 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/timeout" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectclock" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/clock" "github.com/networkservicemesh/sdk/pkg/tools/clockmock" ) @@ -59,19 +61,23 @@ func testClient( client networkservice.NetworkServiceClient, server networkservice.NetworkServiceServer, duration time.Duration, + clk clock.Clock, ) networkservice.NetworkServiceClient { return next.NewNetworkServiceClient( updatepath.NewClient(clientName), - serialize.NewClient(), + begin.NewClient(), + metadata.NewClient(), + injectclock.NewClient(clk), client, adapters.NewServerToClient( next.NewNetworkServiceServer( updatetoken.NewServer(func(_ credentials.AuthInfo) (string, time.Time, error) { return "token", clock.FromContext(ctx).Now().Add(duration), nil }), + begin.NewServer(), + metadata.NewServer(), new(remoteServer), // <-- GRPC invocation updatepath.NewServer(serverName), - serialize.NewServer(), timeout.NewServer(ctx), server, ), @@ -94,6 +100,7 @@ func TestTimeoutServer_Request(t *testing.T) { kernelmech.MECHANISM: connServer, }), tokenTimeout, + clockMock, ) _, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) @@ -123,6 +130,7 @@ func TestTimeoutServer_CloseBeforeTimeout(t *testing.T) { kernelmech.MECHANISM: connServer, }), tokenTimeout, + clockMock, ) conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) @@ -155,6 +163,7 @@ func TestTimeoutServer_CloseAfterTimeout(t *testing.T) { kernelmech.MECHANISM: connServer, }), tokenTimeout, + clockMock, ) conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) @@ -194,7 +203,7 @@ func TestTimeoutServer_RaceTest(t *testing.T) { connServer := newConnectionsServer(t) - client := testClient(ctx, null.NewClient(), connServer, 0) + client := testClient(ctx, null.NewClient(), connServer, 0, clock.FromContext(ctx)) var wg sync.WaitGroup for i := 0; i < 1000; i++ { @@ -233,6 +242,7 @@ func TestTimeoutServer_RefreshFailure(t *testing.T) { connServer, ), tokenTimeout, + clockMock, ) conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) @@ -247,41 +257,6 @@ func TestTimeoutServer_RefreshFailure(t *testing.T) { require.Condition(t, connServer.validator(0, 1)) } -func TestTimeoutServer_CloseFailure(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clockMock := clockmock.New(ctx) - ctx = clock.WithClock(ctx, clockMock) - - connServer := newConnectionsServer(t) - - client := testClient( - ctx, - null.NewClient(), - next.NewNetworkServiceServer( - injecterror.NewServer( - injecterror.WithRequestErrorTimes(), - injecterror.WithCloseErrorTimes(0)), - connServer, - ), - tokenTimeout, - ) - - conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) - require.NoError(t, err) - require.Condition(t, connServer.validator(1, 0)) - - _, err = client.Close(ctx, conn) - require.Error(t, err) - require.Condition(t, connServer.validator(1, 0)) - - clockMock.Add(tokenTimeout) - require.Eventually(t, connServer.validator(0, 1), testWait, testTick) -} - type remoteServer struct{} func (s *remoteServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { @@ -322,7 +297,6 @@ func (s *connectionsServer) validator(open, closed int) func() bool { if connsOpen != open || connsClosed != closed { return false } - return true } } diff --git a/pkg/networkservice/utils/inject/injectclock/client.go b/pkg/networkservice/utils/inject/injectclock/client.go new file mode 100644 index 000000000..610634b61 --- /dev/null +++ b/pkg/networkservice/utils/inject/injectclock/client.go @@ -0,0 +1,50 @@ +// Copyright (c) 2021 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package injectclock can be used in testing to inject a mockClock into the context of the chain +package injectclock + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/clock" +) + +type injectClockClient struct { + clock.Clock +} + +// NewClient - client that injects clk into the context as it passes through the chain element +func NewClient(clk clock.Clock) networkservice.NetworkServiceClient { + return &injectClockClient{ + Clock: clk, + } +} + +func (i *injectClockClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + ctx = clock.WithClock(ctx, i.Clock) + return next.Client(ctx).Request(ctx, request) +} + +func (i *injectClockClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + ctx = clock.WithClock(ctx, i.Clock) + return next.Client(ctx).Close(ctx, conn) +} From 34234261e4269578264904bd5554d6b29c12b14d Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Thu, 26 Aug 2021 23:19:04 -0500 Subject: [PATCH 2/9] Update pkg/networkservice/common/begin/event_factory.go Signed-off-by: Ed Warnicke Co-authored-by: Vladimir Popov Signed-off-by: Ed Warnicke --- .../common/begin/event_factory.go | 35 +++++++--------- pkg/networkservice/common/timeout/server.go | 11 +++-- .../common/timeout/server_test.go | 42 ++++++++++++++++++- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go index 31e826650..d4a6e8b39 100644 --- a/pkg/networkservice/common/begin/event_factory.go +++ b/pkg/networkservice/common/begin/event_factory.go @@ -18,12 +18,13 @@ package begin import ( "context" - "time" "github.com/edwarnicke/serialize" "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) @@ -58,7 +59,11 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc. client: next.Client(ctx), opts: opts, } - f.ctxFunc = cxtFuncFromContext(ctx, f) + ctxFunc := postpone.Context(ctx) + f.ctxFunc = func() (context.Context, context.CancelFunc) { + eventCtx, cancel := ctxFunc() + return withEventFactory(eventCtx, f), cancel + } f.afterClose = func() { f.state = closed @@ -107,7 +112,7 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error { ch := make(chan error, 1) f.executor.AsyncExec(func() { defer close(ch) - if f.state != established || f.client == nil || f.request == nil { + if f.client == nil || f.request == nil { return } select { @@ -138,7 +143,11 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory f := &eventFactoryServer{ server: next.Server(ctx), } - f.ctxFunc = cxtFuncFromContext(ctx, f) + ctxFunc := postpone.Context(ctx) + f.ctxFunc = func() (context.Context, context.CancelFunc) { + eventCtx, cancel := ctxFunc() + return withEventFactory(eventCtx, f), cancel + } f.afterClose = func() { f.state = closed @@ -185,7 +194,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error { ch := make(chan error, 1) <-f.executor.AsyncExec(func() { defer close(ch) - if f.state != established || f.server == nil || f.request == nil { + if f.server == nil || f.request == nil { return } select { @@ -201,19 +210,3 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error { } var _ EventFactory = &eventFactoryServer{} - -func cxtFuncFromContext(ctx context.Context, eventFactory EventFactory) func() (context.Context, context.CancelFunc) { - ctxFunc := func() (context.Context, context.CancelFunc) { - eventCtx, cancel := context.WithCancel(context.Background()) - return withEventFactory(eventCtx, eventFactory), cancel - } - if deadline, ok := ctx.Deadline(); ok { - duration := time.Until(deadline) - ctxFunc = func() (context.Context, context.CancelFunc) { - eventContext, cancel := context.WithTimeout(context.Background(), duration) - eventContext = withEventFactory(eventContext, eventFactory) - return eventContext, cancel - } - } - return ctxFunc -} diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index 70a65f84c..c3c48d54d 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -23,6 +23,8 @@ import ( "context" "time" + iserror "errors" + "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" @@ -82,8 +84,11 @@ func (s *timeoutServer) Request(ctx context.Context, request *networkservice.Net } func (s *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { - oldCancel() + _, err := next.Server(ctx).Close(ctx, conn) + if !(iserror.Is(err, context.DeadlineExceeded) || iserror.Is(err, context.Canceled)) { + if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { + oldCancel() + } } - return next.Server(ctx).Close(ctx, conn) + return &empty.Empty{}, err } diff --git a/pkg/networkservice/common/timeout/server_test.go b/pkg/networkservice/common/timeout/server_test.go index 3d738419c..d1ef59db1 100644 --- a/pkg/networkservice/common/timeout/server_test.go +++ b/pkg/networkservice/common/timeout/server_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -238,7 +239,8 @@ func TestTimeoutServer_RefreshFailure(t *testing.T) { next.NewNetworkServiceServer( injecterror.NewServer( injecterror.WithRequestErrorTimes(1, -1), - injecterror.WithCloseErrorTimes()), + injecterror.WithCloseErrorTimes(), + ), connServer, ), tokenTimeout, @@ -257,6 +259,44 @@ func TestTimeoutServer_RefreshFailure(t *testing.T) { require.Condition(t, connServer.validator(0, 1)) } +func TestTimeoutServer_CloseFailure(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clockMock := clockmock.New(ctx) + ctx = clock.WithClock(ctx, clockMock) + + connServer := newConnectionsServer(t) + + client := testClient( + ctx, + null.NewClient(), + next.NewNetworkServiceServer( + injecterror.NewServer( + injecterror.WithError(errors.WithStack(context.DeadlineExceeded)), + injecterror.WithRequestErrorTimes(), + injecterror.WithCloseErrorTimes(0)), + + connServer, + ), + tokenTimeout, + clockMock, + ) + + conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{}) + require.NoError(t, err) + require.Condition(t, connServer.validator(1, 0)) + + _, err = client.Close(ctx, conn) + require.Error(t, err) + require.Condition(t, connServer.validator(1, 0)) + + clockMock.Add(tokenTimeout) + require.Eventually(t, connServer.validator(0, 1), testWait, testTick) +} + type remoteServer struct{} func (s *remoteServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { From ccf2e88a3b78a974d05a985720dd66a0ee9f9355 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sat, 28 Aug 2021 15:14:54 -0500 Subject: [PATCH 3/9] Update pkg/networkservice/common/begin/event_factory.go Signed-off-by: Ed Warnicke Co-authored-by: Vladimir Popov --- pkg/networkservice/common/begin/event_factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go index d4a6e8b39..f942736ba 100644 --- a/pkg/networkservice/common/begin/event_factory.go +++ b/pkg/networkservice/common/begin/event_factory.go @@ -192,7 +192,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error { opt(o) } ch := make(chan error, 1) - <-f.executor.AsyncExec(func() { + f.executor.AsyncExec(func() { defer close(ch) if f.server == nil || f.request == nil { return From 100965ac7e1f3b4a45fcb6964f5f8c629bb87c2e Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sat, 28 Aug 2021 15:21:14 -0500 Subject: [PATCH 4/9] Make metadata access functions private for refresh and timeout Signed-off-by: Ed Warnicke --- pkg/networkservice/common/refresh/client.go | 8 ++--- pkg/networkservice/common/refresh/metadata.go | 30 +++---------------- pkg/networkservice/common/timeout/metadata.go | 30 +++---------------- pkg/networkservice/common/timeout/server.go | 6 ++-- 4 files changed, 15 insertions(+), 59 deletions(-) diff --git a/pkg/networkservice/common/refresh/client.go b/pkg/networkservice/common/refresh/client.go index 47bb7b9c0..b3dcce773 100644 --- a/pkg/networkservice/common/refresh/client.go +++ b/pkg/networkservice/common/refresh/client.go @@ -69,11 +69,11 @@ func (t *refreshClient) Request(ctx context.Context, request *networkservice.Net cancelCtx, cancel := context.WithCancel(t.chainCtx) for { // Call the old cancel to cancel any existing refreshes hanging out waiting to go - if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(t)); loaded { + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(t)); loaded { oldCancel() } - // Store the cancel context and break out of the loop - if _, loaded := LoadOrStore(ctx, metadata.IsClient(t), cancel); !loaded { + // store the cancel context and break out of the loop + if _, loaded := loadOrStore(ctx, metadata.IsClient(t), cancel); !loaded { break } } @@ -95,7 +95,7 @@ func (t *refreshClient) Request(ctx context.Context, request *networkservice.Net } func (t *refreshClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (e *empty.Empty, err error) { - if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(t)); loaded { + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(t)); loaded { oldCancel() } return next.Client(ctx).Close(ctx, conn, opts...) diff --git a/pkg/networkservice/common/refresh/metadata.go b/pkg/networkservice/common/refresh/metadata.go index 2ec5693a8..2e14c6eea 100644 --- a/pkg/networkservice/common/refresh/metadata.go +++ b/pkg/networkservice/common/refresh/metadata.go @@ -24,32 +24,10 @@ import ( type key struct{} -// Store sets the context.CancelFunc stored in per Connection.Id metadata. -func Store(ctx context.Context, isClient bool, cancel context.CancelFunc) { - metadata.Map(ctx, isClient).Store(key{}, cancel) -} - -// Delete deletes the context.CancelFunc stored in per Connection.Id metadata -func Delete(ctx context.Context, isClient bool) { - metadata.Map(ctx, isClient).Delete(key{}) -} - -// Load returns the context.CancelFunc stored in per Connection.Id metadata, or nil if no -// value is present. -// The ok result indicates whether value was found in the per Connection.Id metadata. -func Load(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) - if !ok { - return - } - value, ok = rawValue.(context.CancelFunc) - return value, ok -} - -// LoadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. +// loadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. // Otherwise, it stores and returns the given nterface_types.InterfaceIndex. // The loaded result is true if the value was loaded, false if stored. -func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { +func loadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) if !ok { return @@ -58,9 +36,9 @@ func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) return value, ok } -// LoadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, // returning the previous value if any. The loaded result reports whether the key was present. -func LoadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { +func loadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) if !ok { return diff --git a/pkg/networkservice/common/timeout/metadata.go b/pkg/networkservice/common/timeout/metadata.go index 0f602be41..ae9657430 100644 --- a/pkg/networkservice/common/timeout/metadata.go +++ b/pkg/networkservice/common/timeout/metadata.go @@ -24,32 +24,10 @@ import ( type key struct{} -// Store sets the context.CancelFunc stored in per Connection.Id metadata. -func Store(ctx context.Context, isClient bool, cancel context.CancelFunc) { - metadata.Map(ctx, isClient).Store(key{}, cancel) -} - -// Delete deletes the context.CancelFunc stored in per Connection.Id metadata -func Delete(ctx context.Context, isClient bool) { - metadata.Map(ctx, isClient).Delete(key{}) -} - -// Load returns the context.CancelFunc stored in per Connection.Id metadata, or nil if no -// value is present. -// The ok result indicates whether value was found in the per Connection.Id metadata. -func Load(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).Load(key{}) - if !ok { - return - } - value, ok = rawValue.(context.CancelFunc) - return value, ok -} - -// LoadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. +// loadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. // Otherwise, it stores and returns the given nterface_types.InterfaceIndex. // The loaded result is true if the value was loaded, false if stored. -func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { +func loadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) if !ok { return @@ -58,9 +36,9 @@ func LoadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) return value, ok } -// LoadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, +// loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, // returning the previous value if any. The loaded result reports whether the key was present. -func LoadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { +func loadAndDelete(ctx context.Context, isClient bool) (value context.CancelFunc, ok bool) { rawValue, ok := metadata.Map(ctx, isClient).LoadAndDelete(key{}) if !ok { return diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index c3c48d54d..e5f963d88 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -62,10 +62,10 @@ func (s *timeoutServer) Request(ctx context.Context, request *networkservice.Net expirationTime := expirationTimestamp.AsTime() cancelCtx, cancel := context.WithCancel(s.chainCtx) for { - if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(s)); loaded { oldCancel() } - if _, loaded := LoadOrStore(ctx, metadata.IsClient(s), cancel); !loaded { + if _, loaded := loadOrStore(ctx, metadata.IsClient(s), cancel); !loaded { break } } @@ -86,7 +86,7 @@ func (s *timeoutServer) Request(ctx context.Context, request *networkservice.Net func (s *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { _, err := next.Server(ctx).Close(ctx, conn) if !(iserror.Is(err, context.DeadlineExceeded) || iserror.Is(err, context.Canceled)) { - if oldCancel, loaded := LoadAndDelete(ctx, metadata.IsClient(s)); loaded { + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(s)); loaded { oldCancel() } } From ad4366b248f1fd95b5d4416bf8cffccb8f897d31 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sat, 28 Aug 2021 15:30:47 -0500 Subject: [PATCH 5/9] Simplify refresh/timeout cancel handling In response to comment: https://github.com/networkservicemesh/sdk/pull/1072#discussion_r697185038 Signed-off-by: Ed Warnicke --- pkg/networkservice/common/refresh/client.go | 13 ++++--------- pkg/networkservice/common/refresh/metadata.go | 13 +++---------- pkg/networkservice/common/timeout/metadata.go | 13 +++---------- pkg/networkservice/common/timeout/server.go | 10 +++------- 4 files changed, 13 insertions(+), 36 deletions(-) diff --git a/pkg/networkservice/common/refresh/client.go b/pkg/networkservice/common/refresh/client.go index b3dcce773..ec227337b 100644 --- a/pkg/networkservice/common/refresh/client.go +++ b/pkg/networkservice/common/refresh/client.go @@ -67,16 +67,11 @@ func (t *refreshClient) Request(ctx context.Context, request *networkservice.Net // Create a cancel context. cancelCtx, cancel := context.WithCancel(t.chainCtx) - for { - // Call the old cancel to cancel any existing refreshes hanging out waiting to go - if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(t)); loaded { - oldCancel() - } - // store the cancel context and break out of the loop - if _, loaded := loadOrStore(ctx, metadata.IsClient(t), cancel); !loaded { - break - } + + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(t)); loaded { + oldCancel() } + store(ctx, metadata.IsClient(t), cancel) eventFactory := begin.FromContext(ctx) timeClock := clock.FromContext(ctx) diff --git a/pkg/networkservice/common/refresh/metadata.go b/pkg/networkservice/common/refresh/metadata.go index 2e14c6eea..34624ed7d 100644 --- a/pkg/networkservice/common/refresh/metadata.go +++ b/pkg/networkservice/common/refresh/metadata.go @@ -24,16 +24,9 @@ import ( type key struct{} -// loadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. -// Otherwise, it stores and returns the given nterface_types.InterfaceIndex. -// The loaded result is true if the value was loaded, false if stored. -func loadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) - if !ok { - return - } - value, ok = rawValue.(context.CancelFunc) - return value, ok +// store sets the context.CancelFunc stored in per Connection.Id metadata. +func store(ctx context.Context, isClient bool, cancel context.CancelFunc) { + metadata.Map(ctx, isClient).Store(key{}, cancel) } // loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, diff --git a/pkg/networkservice/common/timeout/metadata.go b/pkg/networkservice/common/timeout/metadata.go index ae9657430..e59d0fb6e 100644 --- a/pkg/networkservice/common/timeout/metadata.go +++ b/pkg/networkservice/common/timeout/metadata.go @@ -24,16 +24,9 @@ import ( type key struct{} -// loadOrStore returns the existing context.CancelFunc stored in per Connection.Id metadata if present. -// Otherwise, it stores and returns the given nterface_types.InterfaceIndex. -// The loaded result is true if the value was loaded, false if stored. -func loadOrStore(ctx context.Context, isClient bool, cancel context.CancelFunc) (value context.CancelFunc, ok bool) { - rawValue, ok := metadata.Map(ctx, isClient).LoadOrStore(key{}, cancel) - if !ok { - return - } - value, ok = rawValue.(context.CancelFunc) - return value, ok +// store sets the context.CancelFunc stored in per Connection.Id metadata. +func store(ctx context.Context, isClient bool, cancel context.CancelFunc) { + metadata.Map(ctx, isClient).Store(key{}, cancel) } // loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata, diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index e5f963d88..084ff3780 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -61,14 +61,10 @@ func (s *timeoutServer) Request(ctx context.Context, request *networkservice.Net } expirationTime := expirationTimestamp.AsTime() cancelCtx, cancel := context.WithCancel(s.chainCtx) - for { - if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(s)); loaded { - oldCancel() - } - if _, loaded := loadOrStore(ctx, metadata.IsClient(s), cancel); !loaded { - break - } + if oldCancel, loaded := loadAndDelete(ctx, metadata.IsClient(s)); loaded { + oldCancel() } + store(ctx, metadata.IsClient(s), cancel) eventFactory := begin.FromContext(ctx) timeClock := clock.FromContext(ctx) afterCh := timeClock.After(timeClock.Until(expirationTime)) From abc74101fbdd6206a4d522ed6370f8b094e074e8 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sat, 28 Aug 2021 15:34:49 -0500 Subject: [PATCH 6/9] Simplification in response to https://github.com/networkservicemesh/sdk/pull/1072#discussion_r697166412 Signed-off-by: Ed Warnicke --- pkg/networkservice/common/begin/event_factory.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go index f942736ba..6f2e43efd 100644 --- a/pkg/networkservice/common/begin/event_factory.go +++ b/pkg/networkservice/common/begin/event_factory.go @@ -112,7 +112,7 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error { ch := make(chan error, 1) f.executor.AsyncExec(func() { defer close(ch) - if f.client == nil || f.request == nil { + if f.request == nil { return } select { @@ -194,7 +194,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error { ch := make(chan error, 1) f.executor.AsyncExec(func() { defer close(ch) - if f.server == nil || f.request == nil { + if f.request == nil { return } select { From f34db0227fefa321b39f858cb1a4e0332eb06043 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Mon, 30 Aug 2021 11:06:16 -0500 Subject: [PATCH 7/9] Change in response to comment https://github.com/networkservicemesh/sdk/pull/1072#discussion_r698539750 Signed-off-by: Ed Warnicke --- pkg/networkservice/common/begin/event_factory.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go index 6f2e43efd..13b76dbd7 100644 --- a/pkg/networkservice/common/begin/event_factory.go +++ b/pkg/networkservice/common/begin/event_factory.go @@ -84,7 +84,7 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error { ch := make(chan error, 1) f.executor.AsyncExec(func() { defer close(ch) - if f.state != established || f.client == nil || f.request == nil { + if f.state != established { return } select { @@ -166,7 +166,7 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error { ch := make(chan error, 1) f.executor.AsyncExec(func() { defer close(ch) - if f.state != established || f.server == nil || f.request == nil { + if f.state != established { return } select { From a41c6c2d09933404bcb3eca69a85cfaaca2b86ca Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Mon, 30 Aug 2021 15:03:40 -0500 Subject: [PATCH 8/9] Update pkg/networkservice/common/begin/event_factory.go Signed-off-by: Ed Warnicke Co-authored-by: Denis Tingaikin <49399980+denis-tingaikin@users.noreply.github.com> Signed-off-by: Ed Warnicke --- pkg/networkservice/common/begin/client.go | 4 +-- .../common/begin/event_factory.go | 32 +++++++++---------- pkg/networkservice/common/begin/server.go | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/networkservice/common/begin/client.go b/pkg/networkservice/common/begin/client.go index 8f90790b7..0bb222904 100644 --- a/pkg/networkservice/common/begin/client.go +++ b/pkg/networkservice/common/begin/client.go @@ -105,8 +105,8 @@ func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection conn = eventFactoryClient.request.Connection ctx = withEventFactory(ctx, eventFactoryClient) emp, err = next.Client(ctx).Close(ctx, conn, opts...) - // afterClose() is used to cleanup things like the entry in the Map for EventFactories - eventFactoryClient.afterClose() + // afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories + eventFactoryClient.afterCloseFunc() }) return emp, err } diff --git a/pkg/networkservice/common/begin/event_factory.go b/pkg/networkservice/common/begin/event_factory.go index 13b76dbd7..e55b0abde 100644 --- a/pkg/networkservice/common/begin/event_factory.go +++ b/pkg/networkservice/common/begin/event_factory.go @@ -45,13 +45,13 @@ type EventFactory interface { } type eventFactoryClient struct { - state connectionState - executor serialize.Executor - ctxFunc func() (context.Context, context.CancelFunc) - request *networkservice.NetworkServiceRequest - opts []grpc.CallOption - client networkservice.NetworkServiceClient - afterClose func() + state connectionState + executor serialize.Executor + ctxFunc func() (context.Context, context.CancelFunc) + request *networkservice.NetworkServiceRequest + opts []grpc.CallOption + client networkservice.NetworkServiceClient + afterCloseFunc func() } func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.CallOption) *eventFactoryClient { @@ -65,7 +65,7 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc. return withEventFactory(eventCtx, f), cancel } - f.afterClose = func() { + f.afterCloseFunc = func() { f.state = closed if afterClose != nil { afterClose() @@ -121,7 +121,7 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error { ctx, cancel := f.ctxFunc() defer cancel() _, err := f.client.Close(ctx, f.request.GetConnection(), f.opts...) - f.afterClose() + f.afterCloseFunc() ch <- err } }) @@ -131,12 +131,12 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error { var _ EventFactory = &eventFactoryClient{} type eventFactoryServer struct { - state connectionState - executor serialize.Executor - ctxFunc func() (context.Context, context.CancelFunc) - request *networkservice.NetworkServiceRequest - afterClose func() - server networkservice.NetworkServiceServer + state connectionState + executor serialize.Executor + ctxFunc func() (context.Context, context.CancelFunc) + request *networkservice.NetworkServiceRequest + afterCloseFunc func() + server networkservice.NetworkServiceServer } func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer { @@ -149,7 +149,7 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory return withEventFactory(eventCtx, f), cancel } - f.afterClose = func() { + f.afterCloseFunc = func() { f.state = closed afterClose() } diff --git a/pkg/networkservice/common/begin/server.go b/pkg/networkservice/common/begin/server.go index 181a3433f..4aa1b17c3 100644 --- a/pkg/networkservice/common/begin/server.go +++ b/pkg/networkservice/common/begin/server.go @@ -96,7 +96,7 @@ func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection conn = eventFactoryServer.request.Connection ctx = withEventFactory(ctx, eventFactoryServer) emp, err = next.Server(ctx).Close(ctx, conn) - eventFactoryServer.afterClose() + eventFactoryServer.afterCloseFunc() }) return emp, err } From 78d9292694f27fbe7c5f0affe78b6193f646ae24 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Mon, 30 Aug 2021 15:24:47 -0500 Subject: [PATCH 9/9] Improve logging in response to https://github.com/networkservicemesh/sdk/pull/1072#discussion_r698700801 Signed-off-by: Ed Warnicke --- pkg/networkservice/common/begin/client.go | 11 +++++++---- pkg/networkservice/common/begin/server.go | 7 +++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/networkservice/common/begin/client.go b/pkg/networkservice/common/begin/client.go index 0bb222904..cdd3c1654 100644 --- a/pkg/networkservice/common/begin/client.go +++ b/pkg/networkservice/common/begin/client.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" + "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) @@ -57,8 +59,9 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo <-eventFactoryClient.executor.AsyncExec(func() { // If the eventFactory has changed, usually because the connection has been Closed and re-established // go back to the beginning and try again. - currentConnClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryClient) - if currentConnClient != eventFactoryClient { + currentEventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryClient) + if currentEventFactoryClient != eventFactoryClient { + log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryClient != eventFactoryClient") conn, err = b.Request(ctx, request) return } @@ -97,8 +100,8 @@ func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection } // If this isn't the connection we started with, do nothing - currentConnClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryClient) - if currentConnClient != eventFactoryClient { + currentEventFactoryClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryClient) + if currentEventFactoryClient != eventFactoryClient { return } // Always close with the last valid Connection we got diff --git a/pkg/networkservice/common/begin/server.go b/pkg/networkservice/common/begin/server.go index 4aa1b17c3..4109ec2bf 100644 --- a/pkg/networkservice/common/begin/server.go +++ b/pkg/networkservice/common/begin/server.go @@ -23,6 +23,8 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/types/known/emptypb" + "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) @@ -53,8 +55,9 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo ), ) <-eventFactoryServer.executor.AsyncExec(func() { - currentServerClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryServer) - if currentServerClient != eventFactoryServer { + currentEventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryServer) + if currentEventFactoryServer != eventFactoryServer { + log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer") conn, err = b.Request(ctx, request) return }