Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk#1042] Fix monitor blocking sending events on non-reading client #1066

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/networkservice/common/monitor/filter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco Systems, Inc.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,11 +19,14 @@
package monitor

import (
"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
)

type monitorFilter struct {
selector *networkservice.MonitorScopeSelector
executor serialize.Executor

networkservice.MonitorConnection_MonitorConnectionsServer
}

Expand Down
83 changes: 46 additions & 37 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"

"github.com/edwarnicke/serialize"

Expand All @@ -34,10 +35,10 @@ import (
)

type monitorServer struct {
ctx context.Context
connections map[string]*networkservice.Connection
monitors []networkservice.MonitorConnection_MonitorConnectionsServer
filters map[string]*monitorFilter
executor serialize.Executor
ctx context.Context
}

// NewServer - creates a NetworkServiceServer chain element that will properly update a MonitorConnectionServer
Expand All @@ -53,27 +54,35 @@ func NewServer(ctx context.Context, monitorServerPtr *networkservice.MonitorConn
rv := &monitorServer{
ctx: ctx,
connections: make(map[string]*networkservice.Connection),
monitors: nil, // Intentionally nil
filters: make(map[string]*monitorFilter),
}

*monitorServerPtr = rv

return rv
}

func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error {
m.executor.AsyncExec(func() {
monitor := newMonitorFilter(selector, srv)
m.monitors = append(m.monitors, monitor)
filter := newMonitorFilter(selector, srv)
m.filters[uuid.New().String()] = filter

connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector)

// Send initial transfer of all data available
_ = monitor.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER,
Connections: connections,
filter.executor.AsyncExec(func() {
_ = filter.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER,
Connections: connections,
})
})
})

select {
case <-srv.Context().Done():
case <-m.ctx.Done():
}

return nil
}

Expand All @@ -83,53 +92,53 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net
eventConn := conn.Clone()
m.executor.AsyncExec(func() {
m.connections[eventConn.GetId()] = eventConn

// Send update event
event := &networkservice.ConnectionEvent{
// Send UPDATE
m.send(ctx, &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn},
}
if sendErr := m.send(ctx, event); sendErr != nil {
log.FromContext(ctx).Errorf("Error during sending event: %v", sendErr)
}
})
})
}
return conn, err
}

func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
_, closeErr := next.Server(ctx).Close(ctx, conn)
rv, err := next.Server(ctx).Close(ctx, conn)

// Remove connection object we have and send DELETE
eventConn := conn.Clone()
m.executor.AsyncExec(func() {
delete(m.connections, eventConn.GetId())

event := &networkservice.ConnectionEvent{
m.send(ctx, &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_DELETE,
Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn},
}
if err := m.send(ctx, event); err != nil {
log.FromContext(ctx).Errorf("Error during sending event: %v", err)
}
})
})
return &empty.Empty{}, closeErr

return rv, err
}

// send - perform a send to clients.
func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) (err error) {
newMonitors := []networkservice.MonitorConnection_MonitorConnectionsServer{}
for _, filter := range m.monitors {
select {
case <-filter.Context().Done():
default:
if err = filter.Send(event.Clone()); err != nil {
log.FromContext(ctx).Errorf("Error sending event: %+v: %+v", event, err)
func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) {
logger := log.FromContext(ctx).WithField("monitorServer", "send")
for id, filter := range m.filters {
id, filter := id, filter
e := event.Clone()
filter.executor.AsyncExec(func() {
var err error
select {
case <-filter.Context().Done():
err = filter.Context().Err()
default:
err = filter.Send(e)
}
if err == nil {
return
}
newMonitors = append(newMonitors, filter)
}
}

m.monitors = newMonitors
return err
logger.Errorf("error sending event: %+v %s", e, err.Error())
m.executor.AsyncExec(func() {
delete(m.filters, id)
})
})
}
}
62 changes: 37 additions & 25 deletions pkg/networkservice/common/monitor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,26 @@ package monitor_test
import (
"context"
"testing"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
)

func TestMonitor(t *testing.T) {
func TestMonitorServer(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Specify pathSegments to test
segmentNames := []string{"local-nsm", "remote-nsm"}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create monitorServer, monitorClient, and server.
var monitorServer networkservice.MonitorConnectionServer
server := monitor.NewServer(ctx, &monitorServer)
Expand All @@ -44,26 +48,32 @@ func TestMonitor(t *testing.T) {
connections := make(map[string]*networkservice.Connection)
receivers := make(map[string]networkservice.MonitorConnection_MonitorConnectionsClient)

// Create non-reading monitor client for all connections
_, monitorErr := monitorClient.MonitorConnections(ctx, new(networkservice.MonitorScopeSelector))
require.NoError(t, monitorErr)

// Get Empty initial state transfers
for _, segmentName := range segmentNames {
var err error
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
receivers[segmentName], err = monitorClient.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{
monitorCtx, cancelMonitor := context.WithCancel(ctx)
defer cancelMonitor()

receivers[segmentName], monitorErr = monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{Name: segmentName}},
})
assert.Nil(t, err)
require.NoError(t, monitorErr)

event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 0)
require.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType())
require.Empty(t, event.GetConnections()[segmentName].GetPath().GetPathSegments())
}

// Send requests
for _, segmentName := range segmentNames {
var err error
connections[segmentName], err = server.Request(context.Background(), &networkservice.NetworkServiceRequest{
connections[segmentName], err = server.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: segmentName,
Path: &networkservice.Path{
Expand All @@ -76,32 +86,34 @@ func TestMonitor(t *testing.T) {
},
},
})
assert.Nil(t, err)
require.NoError(t, err)
}

// Get Updates and insure we've properly filtered by segmentName
for _, segmentName := range segmentNames {
event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1)
assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
require.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType())
require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1)
require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
}

// Close Connections
for _, conn := range connections {
_, err := server.Close(context.Background(), conn)
assert.Nil(t, err)
_, err := server.Close(ctx, conn)
require.NoError(t, err)
}

// Get Delete Events and insure we've properly filtered by segmentName
for _, segmentName := range segmentNames {
event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1)
assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
require.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType())
require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1)
require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
}
}
4 changes: 2 additions & 2 deletions pkg/networkservice/core/adapters/monitor_server_to_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewMonitorServerToClient(server networkservice.MonitorConnectionServer) net
return &monitorServerToClient{server: server}
}

func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, opts ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
eventCh := make(chan *networkservice.ConnectionEvent, 100)
func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
eventCh := make(chan *networkservice.ConnectionEvent, 1)
srv := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh)
go func() {
_ = m.server.MonitorConnections(selector, srv)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -41,11 +43,12 @@ func NewMonitorConnectionMonitorConnectionsServer(ctx context.Context, eventCh c
}

func (m *monitorConnectionMonitorConnectionsServer) Send(event *networkservice.ConnectionEvent) error {
if err := m.ctx.Err(); err != nil {
return errors.Wrap(err, "Can no longer Send")
select {
case <-m.ctx.Done():
return m.ctx.Err()
case m.eventCh <- event:
return nil
}
m.eventCh <- event
return nil
}

func (m *monitorConnectionMonitorConnectionsServer) Context() context.Context {
Expand Down