-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing the begin chain element #1072
Changes from 7 commits
6274111
3423426
ccf2e88
100965a
ad4366b
abc7410
f34db02
a41c6c2
78d9292
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
// Copyright (c) 2021 Cisco and/or its affiliates. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at: | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package begin | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/networkservicemesh/api/pkg/api/networkservice" | ||
"github.com/pkg/errors" | ||
"google.golang.org/grpc" | ||
"google.golang.org/protobuf/types/known/emptypb" | ||
|
||
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next" | ||
) | ||
|
||
type beginClient struct { | ||
clientMap | ||
} | ||
|
||
// NewClient - creates a new begin chain element | ||
func NewClient() networkservice.NetworkServiceClient { | ||
return &beginClient{} | ||
} | ||
|
||
func (b *beginClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { | ||
// No connection.ID, no service | ||
if request.GetConnection().GetId() == "" { | ||
return nil, errors.New("request.EventFactory.Id must not be zero valued") | ||
} | ||
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally | ||
if fromContext(ctx) != nil { | ||
return next.Client(ctx).Request(ctx, request, opts...) | ||
} | ||
eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(), | ||
newEventFactoryClient( | ||
ctx, | ||
func() { | ||
b.Delete(request.GetRequestConnection().GetId()) | ||
}, | ||
opts..., | ||
), | ||
) | ||
<-eventFactoryClient.executor.AsyncExec(func() { | ||
// If the eventFactory has changed, usually because the connection has been Closed and re-established | ||
// go back to the beginning and try again. | ||
currentConnClient, _ := b.LoadOrStore(request.GetConnection().GetId(), eventFactoryClient) | ||
if currentConnClient != eventFactoryClient { | ||
conn, err = b.Request(ctx, request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel it can be super useful if we add logging on this branch because |
||
return | ||
} | ||
|
||
ctx = withEventFactory(ctx, eventFactoryClient) | ||
conn, err = next.Client(ctx).Request(ctx, request, opts...) | ||
if err != nil { | ||
if eventFactoryClient.state != established { | ||
eventFactoryClient.state = closed | ||
b.Delete(request.GetConnection().GetId()) | ||
} | ||
return | ||
} | ||
eventFactoryClient.request = request.Clone() | ||
eventFactoryClient.request.Connection = conn.Clone() | ||
eventFactoryClient.opts = opts | ||
eventFactoryClient.state = established | ||
}) | ||
return conn, err | ||
} | ||
|
||
func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (emp *emptypb.Empty, err error) { | ||
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally | ||
if fromContext(ctx) != nil { | ||
return next.Client(ctx).Close(ctx, conn, opts...) | ||
} | ||
eventFactoryClient, ok := b.Load(conn.GetId()) | ||
if !ok { | ||
// If we don't have a connection to Close, just let it be | ||
return | ||
} | ||
<-eventFactoryClient.executor.AsyncExec(func() { | ||
// If the connection is not established, don't do anything | ||
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil { | ||
return | ||
} | ||
|
||
// If this isn't the connection we started with, do nothing | ||
currentConnClient, _ := b.LoadOrStore(conn.GetId(), eventFactoryClient) | ||
if currentConnClient != eventFactoryClient { | ||
return | ||
} | ||
// Always close with the last valid Connection we got | ||
conn = eventFactoryClient.request.Connection | ||
ctx = withEventFactory(ctx, eventFactoryClient) | ||
emp, err = next.Client(ctx).Close(ctx, conn, opts...) | ||
// afterClose() is used to cleanup things like the entry in the Map for EventFactories | ||
eventFactoryClient.afterClose() | ||
}) | ||
return emp, err | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// Copyright (c) 2021 Cisco and/or its affiliates. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at: | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package begin_test | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/networkservicemesh/api/pkg/api/networkservice" | ||
"github.com/stretchr/testify/assert" | ||
"go.uber.org/goleak" | ||
"google.golang.org/grpc" | ||
"google.golang.org/protobuf/types/known/emptypb" | ||
|
||
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" | ||
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" | ||
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next" | ||
) | ||
|
||
const ( | ||
mark = "mark" | ||
) | ||
|
||
func TestCloseClient(t *testing.T) { | ||
t.Cleanup(func() { goleak.VerifyNone(t) }) | ||
client := chain.NewNetworkServiceClient( | ||
begin.NewClient(), | ||
&markClient{t: t}, | ||
) | ||
id := "1" | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
conn, err := client.Request(ctx, testRequest(id)) | ||
assert.NotNil(t, t, conn) | ||
assert.NoError(t, err) | ||
assert.Equal(t, conn.GetContext().GetExtraContext()[mark], mark) | ||
conn = conn.Clone() | ||
delete(conn.GetContext().GetExtraContext(), mark) | ||
assert.Zero(t, conn.GetContext().GetExtraContext()[mark]) | ||
_, err = client.Close(ctx, conn) | ||
assert.NoError(t, err) | ||
} | ||
|
||
type markClient struct { | ||
t *testing.T | ||
} | ||
|
||
func (m *markClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { | ||
if request.GetConnection().GetContext() == nil { | ||
request.GetConnection().Context = &networkservice.ConnectionContext{} | ||
} | ||
if request.GetConnection().GetContext().GetExtraContext() == nil { | ||
request.GetConnection().GetContext().ExtraContext = make(map[string]string) | ||
} | ||
request.GetConnection().GetContext().GetExtraContext()[mark] = mark | ||
return next.Client(ctx).Request(ctx, request, opts...) | ||
} | ||
|
||
func (m *markClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { | ||
assert.NotNil(m.t, conn.GetContext().GetExtraContext()) | ||
assert.Equal(m.t, mark, conn.GetContext().GetExtraContext()[mark]) | ||
return next.Client(ctx).Close(ctx, conn, opts...) | ||
} | ||
|
||
var _ networkservice.NetworkServiceClient = &markClient{} | ||
|
||
func TestDoubleCloseClient(t *testing.T) { | ||
t.Cleanup(func() { goleak.VerifyNone(t) }) | ||
client := chain.NewNetworkServiceClient( | ||
begin.NewClient(), | ||
&doubleCloseClient{t: t}, | ||
) | ||
id := "1" | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
conn, err := client.Request(ctx, testRequest(id)) | ||
assert.NotNil(t, t, conn) | ||
assert.NoError(t, err) | ||
conn = conn.Clone() | ||
_, err = client.Close(ctx, conn) | ||
assert.NoError(t, err) | ||
_, err = client.Close(ctx, conn) | ||
assert.NoError(t, err) | ||
} | ||
|
||
type doubleCloseClient struct { | ||
t *testing.T | ||
sync.Once | ||
} | ||
|
||
func (s *doubleCloseClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { | ||
return next.Client(ctx).Request(ctx, request, opts...) | ||
} | ||
|
||
func (s *doubleCloseClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { | ||
count := 1 | ||
s.Do(func() { | ||
count++ | ||
}) | ||
assert.Equal(s.t, 2, count, "Close has been called more than once") | ||
return next.Client(ctx).Close(ctx, conn, opts...) | ||
} | ||
|
||
var _ networkservice.NetworkServiceClient = &doubleCloseClient{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note... timeout's placement order in the chain no longer matters. As long as its after begin and metadata, it will work fine, because the Close() event it fires will start from begin.