diff --git a/pkg/networkservice/common/monitor/filter.go b/pkg/networkservice/common/monitor/filter.go index 68eb40350..df26c8405 100644 --- a/pkg/networkservice/common/monitor/filter.go +++ b/pkg/networkservice/common/monitor/filter.go @@ -1,5 +1,7 @@ // Copyright (c) 2020 Cisco Systems, Inc. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,11 +19,14 @@ package monitor import ( + "github.com/edwarnicke/serialize" "github.com/networkservicemesh/api/pkg/api/networkservice" ) type monitorFilter struct { selector *networkservice.MonitorScopeSelector + executor serialize.Executor + networkservice.MonitorConnection_MonitorConnectionsServer } diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index 508cb6d30..16f957067 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -24,6 +24,7 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" + "github.com/google/uuid" "github.com/edwarnicke/serialize" @@ -34,10 +35,10 @@ import ( ) type monitorServer struct { + ctx context.Context connections map[string]*networkservice.Connection - monitors []networkservice.MonitorConnection_MonitorConnectionsServer + filters map[string]*monitorFilter executor serialize.Executor - ctx context.Context } // NewServer - creates a NetworkServiceServer chain element that will properly update a MonitorConnectionServer @@ -53,27 +54,35 @@ func NewServer(ctx context.Context, monitorServerPtr *networkservice.MonitorConn rv := &monitorServer{ ctx: ctx, connections: make(map[string]*networkservice.Connection), - monitors: nil, // Intentionally nil + filters: make(map[string]*monitorFilter), } + *monitorServerPtr = rv + return rv } func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error { m.executor.AsyncExec(func() { - monitor := newMonitorFilter(selector, srv) - m.monitors = append(m.monitors, monitor) + filter := newMonitorFilter(selector, srv) + m.filters[uuid.New().String()] = filter + connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector) + // Send initial transfer of all data available - _ = monitor.Send(&networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, - Connections: connections, + filter.executor.AsyncExec(func() { + _ = filter.Send(&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, + Connections: connections, + }) }) }) + select { case <-srv.Context().Done(): case <-m.ctx.Done(): } + return nil } @@ -83,53 +92,53 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net eventConn := conn.Clone() m.executor.AsyncExec(func() { m.connections[eventConn.GetId()] = eventConn - - // Send update event - event := &networkservice.ConnectionEvent{ + // Send UPDATE + m.send(ctx, &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_UPDATE, Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn}, - } - if sendErr := m.send(ctx, event); sendErr != nil { - log.FromContext(ctx).Errorf("Error during sending event: %v", sendErr) - } + }) }) } return conn, err } func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - _, closeErr := next.Server(ctx).Close(ctx, conn) + rv, err := next.Server(ctx).Close(ctx, conn) // Remove connection object we have and send DELETE eventConn := conn.Clone() m.executor.AsyncExec(func() { delete(m.connections, eventConn.GetId()) - - event := &networkservice.ConnectionEvent{ + m.send(ctx, &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_DELETE, Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn}, - } - if err := m.send(ctx, event); err != nil { - log.FromContext(ctx).Errorf("Error during sending event: %v", err) - } + }) }) - return &empty.Empty{}, closeErr + + return rv, err } -// send - perform a send to clients. -func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) (err error) { - newMonitors := []networkservice.MonitorConnection_MonitorConnectionsServer{} - for _, filter := range m.monitors { - select { - case <-filter.Context().Done(): - default: - if err = filter.Send(event.Clone()); err != nil { - log.FromContext(ctx).Errorf("Error sending event: %+v: %+v", event, err) +func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) { + logger := log.FromContext(ctx).WithField("monitorServer", "send") + for id, filter := range m.filters { + id, filter := id, filter + e := event.Clone() + filter.executor.AsyncExec(func() { + var err error + select { + case <-filter.Context().Done(): + err = filter.Context().Err() + default: + err = filter.Send(e) + } + if err == nil { + return } - newMonitors = append(newMonitors, filter) - } - } - m.monitors = newMonitors - return err + logger.Errorf("error sending event: %+v %s", e, err.Error()) + m.executor.AsyncExec(func() { + delete(m.filters, id) + }) + }) + } } diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index df3395d7a..af10f408b 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -19,22 +19,26 @@ package monitor_test import ( "context" "testing" + "time" - "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" ) -func TestMonitor(t *testing.T) { +func TestMonitorServer(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + // Specify pathSegments to test segmentNames := []string{"local-nsm", "remote-nsm"} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // Create monitorServer, monitorClient, and server. var monitorServer networkservice.MonitorConnectionServer server := monitor.NewServer(ctx, &monitorServer) @@ -44,26 +48,32 @@ func TestMonitor(t *testing.T) { connections := make(map[string]*networkservice.Connection) receivers := make(map[string]networkservice.MonitorConnection_MonitorConnectionsClient) + // Create non-reading monitor client for all connections + _, monitorErr := monitorClient.MonitorConnections(ctx, new(networkservice.MonitorScopeSelector)) + require.NoError(t, monitorErr) + // Get Empty initial state transfers for _, segmentName := range segmentNames { - var err error - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - receivers[segmentName], err = monitorClient.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{ + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + receivers[segmentName], monitorErr = monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{ PathSegments: []*networkservice.PathSegment{{Name: segmentName}}, }) - assert.Nil(t, err) + require.NoError(t, monitorErr) + event, err := receivers[segmentName].Recv() - assert.Nil(t, err) + require.NoError(t, err) + require.NotNil(t, event) - assert.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType()) - require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 0) + require.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType()) + require.Empty(t, event.GetConnections()[segmentName].GetPath().GetPathSegments()) } // Send requests for _, segmentName := range segmentNames { var err error - connections[segmentName], err = server.Request(context.Background(), &networkservice.NetworkServiceRequest{ + connections[segmentName], err = server.Request(ctx, &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{ Id: segmentName, Path: &networkservice.Path{ @@ -76,32 +86,34 @@ func TestMonitor(t *testing.T) { }, }, }) - assert.Nil(t, err) + require.NoError(t, err) } // Get Updates and insure we've properly filtered by segmentName for _, segmentName := range segmentNames { event, err := receivers[segmentName].Recv() - assert.Nil(t, err) + require.NoError(t, err) + require.NotNil(t, event) - assert.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType()) - require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1) - assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName()) + require.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType()) + require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1) + require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName()) } // Close Connections for _, conn := range connections { - _, err := server.Close(context.Background(), conn) - assert.Nil(t, err) + _, err := server.Close(ctx, conn) + require.NoError(t, err) } // Get Delete Events and insure we've properly filtered by segmentName for _, segmentName := range segmentNames { event, err := receivers[segmentName].Recv() - assert.Nil(t, err) + require.NoError(t, err) + require.NotNil(t, event) - assert.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType()) - require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1) - assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName()) + require.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType()) + require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1) + require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName()) } } diff --git a/pkg/networkservice/core/adapters/monitor_server_to_client.go b/pkg/networkservice/core/adapters/monitor_server_to_client.go index d78af8670..960153545 100644 --- a/pkg/networkservice/core/adapters/monitor_server_to_client.go +++ b/pkg/networkservice/core/adapters/monitor_server_to_client.go @@ -38,8 +38,8 @@ func NewMonitorServerToClient(server networkservice.MonitorConnectionServer) net return &monitorServerToClient{server: server} } -func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, opts ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { - eventCh := make(chan *networkservice.ConnectionEvent, 100) +func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { + eventCh := make(chan *networkservice.ConnectionEvent, 1) srv := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) go func() { _ = m.server.MonitorConnections(selector, srv) diff --git a/pkg/networkservice/core/eventchannel/monitor_connection_server.go b/pkg/networkservice/core/eventchannel/monitor_connection_server.go index 21c5778be..88e6e382a 100644 --- a/pkg/networkservice/core/eventchannel/monitor_connection_server.go +++ b/pkg/networkservice/core/eventchannel/monitor_connection_server.go @@ -1,5 +1,7 @@ // Copyright (c) 2020 Cisco and/or its affiliates. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -41,11 +43,12 @@ func NewMonitorConnectionMonitorConnectionsServer(ctx context.Context, eventCh c } func (m *monitorConnectionMonitorConnectionsServer) Send(event *networkservice.ConnectionEvent) error { - if err := m.ctx.Err(); err != nil { - return errors.Wrap(err, "Can no longer Send") + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.eventCh <- event: + return nil } - m.eventCh <- event - return nil } func (m *monitorConnectionMonitorConnectionsServer) Context() context.Context {