diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index 59c6ba8fda..a7abeced65 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -19,24 +19,34 @@ package heal import ( "context" + "sync" "github.com/golang/protobuf/ptypes/empty" - "google.golang.org/grpc" - "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/grpc" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) -// RegisterClientFunc - required to inform heal server about new client connection and assign it to the connection ID -type RegisterClientFunc func(context.Context, *networkservice.Connection, networkservice.MonitorConnectionClient) +type connectionInfo struct { + mut sync.Mutex + conn *networkservice.Connection + active bool +} type healClient struct { - ctx context.Context - cc networkservice.MonitorConnectionClient + ctx context.Context + cc networkservice.MonitorConnectionClient + initOnce sync.Once + initErr error + conns connectionInfoMap } -// NewClient - creates a new networkservice.NetworkServiceClient chain element that inform healServer about new client connection +// NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state +// and calls heal server in case connection breaks if heal server is present in the chain +// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. +// - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages. func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient { return &healClient{ ctx: ctx, @@ -44,16 +54,118 @@ func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) n } } +func (u *healClient) init(ctx context.Context, conn *networkservice.Connection) error { + monitorClient, err := u.cc.MonitorConnections(u.ctx, &networkservice.MonitorScopeSelector{ + PathSegments: []*networkservice.PathSegment{{Name: conn.GetCurrentPathSegment().Name}, {Name: ""}}, + }) + if err != nil { + return errors.Wrap(err, "MonitorConnections failed") + } + + healConnection := requestHealConnectionFunc(ctx) + if healConnection == nil { + healConnection = func(conn *networkservice.Connection) {} + } + restoreConnection := requestRestoreConnectionFunc(ctx) + if restoreConnection == nil { + restoreConnection = func(conn *networkservice.Connection) {} + } + + go u.listenToConnectionChanges(healConnection, restoreConnection, monitorClient) + + return nil +} + func (u *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - conn, err := next.Client(ctx).Request(ctx, request, opts...) + conn := request.GetConnection() + u.initOnce.Do(func() { + u.initErr = u.init(ctx, conn) + }) + // if initialization failed, then we want for all subsequent calls to Request() on this object to also fail + if u.initErr != nil { + return nil, u.initErr + } - registerClient := registerClientFunc(ctx) - if err == nil && registerClient != nil { - registerClient(u.ctx, conn, u.cc) + connInfo, loaded := u.conns.LoadOrStore(conn.GetId(), &connectionInfo{ + conn: conn.Clone(), + mut: sync.Mutex{}, + }) + u.replaceConnectionPath(conn, connInfo) + + conn, err := next.Client(ctx).Request(ctx, request, opts...) + if err != nil { + if !loaded { + u.conns.Delete(request.GetConnection().GetId()) + } + return nil, err } - return conn, err + + connInfo.mut.Lock() + defer connInfo.mut.Unlock() + connInfo.conn = conn.Clone() + + return conn, nil } func (u *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + connInfo, loaded := u.conns.LoadAndDelete(conn.GetId()) + if !loaded { + return &empty.Empty{}, nil + } + u.replaceConnectionPath(conn, connInfo) + return next.Client(ctx).Close(ctx, conn, opts...) } + +// listenToConnectionChanges - loops on events from MonitorConnectionClient while the monitor client is alive. +// Updates connection cache and broadcasts events of successful connections. +// Calls heal when something breaks. +func (u *healClient) listenToConnectionChanges(healConnection, restoreConnection requestHealFuncType, monitorClient networkservice.MonitorConnection_MonitorConnectionsClient) { + for { + event, err := monitorClient.Recv() + if err != nil { + u.conns.Range(func(id string, info *connectionInfo) bool { + info.mut.Lock() + defer info.mut.Unlock() + restoreConnection(info.conn) + return true + }) + return + } + + for _, eventConn := range event.GetConnections() { + connID := eventConn.GetPrevPathSegment().GetId() + connInfo, ok := u.conns.Load(connID) + if !ok { + continue + } + connInfo.mut.Lock() + switch event.GetType() { + // Why both INITIAL_STATE_TRANSFER and UPDATE: + // Sometimes we start polling events too late, and when we wait for confirmation of success of some connection, + // this connection is in the INITIAL_STATE_TRANSFER event, so we must treat these events the same as UPDATE. + case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, networkservice.ConnectionEventType_UPDATE: + connInfo.active = true + connInfo.conn.Path.PathSegments = eventConn.Clone().Path.PathSegments + connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName + case networkservice.ConnectionEventType_DELETE: + if connInfo.active { + healConnection(connInfo.conn) + } + connInfo.active = false + } + connInfo.mut.Unlock() + } + } +} + +func (u *healClient) replaceConnectionPath(conn *networkservice.Connection, connInfo *connectionInfo) { + path := conn.GetPath() + if path != nil && int(path.Index) < len(path.PathSegments)-1 { + connInfo.mut.Lock() + defer connInfo.mut.Unlock() + path.PathSegments = path.PathSegments[:path.Index+1] + path.PathSegments = append(path.PathSegments, connInfo.conn.Path.PathSegments[path.Index+1:]...) + conn.NetworkServiceEndpointName = connInfo.conn.NetworkServiceEndpointName + } +} diff --git a/pkg/networkservice/common/heal/client_connection_map.gen.go b/pkg/networkservice/common/heal/connection_info_map.gen.go similarity index 55% rename from pkg/networkservice/common/heal/client_connection_map.gen.go rename to pkg/networkservice/common/heal/connection_info_map.gen.go index 95ab0a2881..242591f5a8 100644 --- a/pkg/networkservice/common/heal/client_connection_map.gen.go +++ b/pkg/networkservice/common/heal/connection_info_map.gen.go @@ -1,4 +1,4 @@ -// Code generated by "-output client_connection_map.gen.go -type clientConnMap -output client_connection_map.gen.go -type clientConnMap"; DO NOT EDIT. +// Code generated by "-output connection_info_map.gen.go -type connectionInfoMap -output connection_info_map.gen.go -type connectionInfoMap"; DO NOT EDIT. package heal import ( @@ -7,52 +7,52 @@ import ( // Generate code that will fail if the constants change value. func _() { - // An "cannot convert clientConnMap literal (type clientConnMap) to type sync.Map" compiler error signifies that the base type have changed. + // An "cannot convert connectionInfoMap literal (type connectionInfoMap) to type sync.Map" compiler error signifies that the base type have changed. // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(clientConnMap{}) + _ = (sync.Map)(connectionInfoMap{}) } -var _nil_clientConnMap_clientConnInfo_value = func() (val clientConnInfo) { return }() +var _nil_connectionInfoMap_connectionInfo_value = func() (val *connectionInfo) { return }() // Load returns the value stored in the map for a key, or nil if no // value is present. // The ok result indicates whether value was found in the map. -func (m *clientConnMap) Load(key string) (clientConnInfo, bool) { +func (m *connectionInfoMap) Load(key string) (*connectionInfo, bool) { value, ok := (*sync.Map)(m).Load(key) if value == nil { - return _nil_clientConnMap_clientConnInfo_value, ok + return _nil_connectionInfoMap_connectionInfo_value, ok } - return value.(clientConnInfo), ok + return value.(*connectionInfo), ok } // Store sets the value for a key. -func (m *clientConnMap) Store(key string, value clientConnInfo) { +func (m *connectionInfoMap) Store(key string, value *connectionInfo) { (*sync.Map)(m).Store(key, value) } // LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. -func (m *clientConnMap) LoadOrStore(key string, value clientConnInfo) (clientConnInfo, bool) { +func (m *connectionInfoMap) LoadOrStore(key string, value *connectionInfo) (*connectionInfo, bool) { actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) if actual == nil { - return _nil_clientConnMap_clientConnInfo_value, loaded + return _nil_connectionInfoMap_connectionInfo_value, loaded } - return actual.(clientConnInfo), loaded + return actual.(*connectionInfo), loaded } // LoadAndDelete deletes the value for a key, returning the previous value if any. // The loaded result reports whether the key was present. -func (m *clientConnMap) LoadAndDelete(key string) (value clientConnInfo, loaded bool) { +func (m *connectionInfoMap) LoadAndDelete(key string) (value *connectionInfo, loaded bool) { actual, loaded := (*sync.Map)(m).LoadAndDelete(key) if actual == nil { - return _nil_clientConnMap_clientConnInfo_value, loaded + return _nil_connectionInfoMap_connectionInfo_value, loaded } - return actual.(clientConnInfo), loaded + return actual.(*connectionInfo), loaded } // Delete deletes the value for a key. -func (m *clientConnMap) Delete(key string) { +func (m *connectionInfoMap) Delete(key string) { (*sync.Map)(m).Delete(key) } @@ -66,8 +66,8 @@ func (m *clientConnMap) Delete(key string) { // // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. -func (m *clientConnMap) Range(f func(key string, value clientConnInfo) bool) { +func (m *connectionInfoMap) Range(f func(key string, value *connectionInfo) bool) { (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(clientConnInfo)) + return f(key.(string), value.(*connectionInfo)) }) } diff --git a/pkg/networkservice/common/heal/context.go b/pkg/networkservice/common/heal/context.go index dfc9a4b81e..b82d516503 100644 --- a/pkg/networkservice/common/heal/context.go +++ b/pkg/networkservice/common/heal/context.go @@ -18,23 +18,42 @@ package heal import ( "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" ) const ( - registerClientFuncKey contextKeyType = "RegisterFunc" + requestHealConnectionFuncKey contextKeyType = "requestHealConnectionFuncKey" + requestRestoreConnectionFuncKey contextKeyType = "requestRestoreConnectionFuncKey" ) type contextKeyType string -func withRegisterClientFunc(parent context.Context, registerClientFunc RegisterClientFunc) context.Context { +type requestHealFuncType func(conn *networkservice.Connection) + +func withRequestHealConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context { + if parent == nil { + panic("cannot create context from nil parent") + } + return context.WithValue(parent, requestHealConnectionFuncKey, fun) +} + +func requestHealConnectionFunc(ctx context.Context) requestHealFuncType { + if rv, ok := ctx.Value(requestHealConnectionFuncKey).(requestHealFuncType); ok { + return rv + } + return nil +} + +func withRequestRestoreConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context { if parent == nil { panic("cannot create context from nil parent") } - return context.WithValue(parent, registerClientFuncKey, registerClientFunc) + return context.WithValue(parent, requestRestoreConnectionFuncKey, fun) } -func registerClientFunc(ctx context.Context) RegisterClientFunc { - if rv, ok := ctx.Value(registerClientFuncKey).(RegisterClientFunc); ok { +func requestRestoreConnectionFunc(ctx context.Context) requestHealFuncType { + if rv, ok := ctx.Value(requestRestoreConnectionFuncKey).(requestHealFuncType); ok { return rv } return nil diff --git a/pkg/networkservice/common/heal/connection_map.gen.go b/pkg/networkservice/common/heal/context_wrapper_map.gen.go similarity index 61% rename from pkg/networkservice/common/heal/connection_map.gen.go rename to pkg/networkservice/common/heal/context_wrapper_map.gen.go index 8cb97a2e80..6200dd9ef3 100644 --- a/pkg/networkservice/common/heal/connection_map.gen.go +++ b/pkg/networkservice/common/heal/context_wrapper_map.gen.go @@ -1,4 +1,4 @@ -// Code generated by "-output connection_map.gen.go -type connectionMap -output connection_map.gen.go -type connectionMap"; DO NOT EDIT. +// Code generated by "-output context_wrapper_map.gen.go -type ctxWrapperMap -output context_wrapper_map.gen.go -type ctxWrapperMap"; DO NOT EDIT. package heal import ( @@ -7,52 +7,52 @@ import ( // Generate code that will fail if the constants change value. func _() { - // An "cannot convert connectionMap literal (type connectionMap) to type sync.Map" compiler error signifies that the base type have changed. + // An "cannot convert ctxWrapperMap literal (type ctxWrapperMap) to type sync.Map" compiler error signifies that the base type have changed. // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(connectionMap{}) + _ = (sync.Map)(ctxWrapperMap{}) } -var _nil_connectionMap_connection_value = func() (val connection) { return }() +var _nil_ctxWrapperMap_ctxWrapper_value = func() (val *ctxWrapper) { return }() // Load returns the value stored in the map for a key, or nil if no // value is present. // The ok result indicates whether value was found in the map. -func (m *connectionMap) Load(key string) (connection, bool) { +func (m *ctxWrapperMap) Load(key string) (*ctxWrapper, bool) { value, ok := (*sync.Map)(m).Load(key) if value == nil { - return _nil_connectionMap_connection_value, ok + return _nil_ctxWrapperMap_ctxWrapper_value, ok } - return value.(connection), ok + return value.(*ctxWrapper), ok } // Store sets the value for a key. -func (m *connectionMap) Store(key string, value connection) { +func (m *ctxWrapperMap) Store(key string, value *ctxWrapper) { (*sync.Map)(m).Store(key, value) } // LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. -func (m *connectionMap) LoadOrStore(key string, value connection) (connection, bool) { +func (m *ctxWrapperMap) LoadOrStore(key string, value *ctxWrapper) (*ctxWrapper, bool) { actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) if actual == nil { - return _nil_connectionMap_connection_value, loaded + return _nil_ctxWrapperMap_ctxWrapper_value, loaded } - return actual.(connection), loaded + return actual.(*ctxWrapper), loaded } // LoadAndDelete deletes the value for a key, returning the previous value if any. // The loaded result reports whether the key was present. -func (m *connectionMap) LoadAndDelete(key string) (value connection, loaded bool) { +func (m *ctxWrapperMap) LoadAndDelete(key string) (value *ctxWrapper, loaded bool) { actual, loaded := (*sync.Map)(m).LoadAndDelete(key) if actual == nil { - return _nil_connectionMap_connection_value, loaded + return _nil_ctxWrapperMap_ctxWrapper_value, loaded } - return actual.(connection), loaded + return actual.(*ctxWrapper), loaded } // Delete deletes the value for a key. -func (m *connectionMap) Delete(key string) { +func (m *ctxWrapperMap) Delete(key string) { (*sync.Map)(m).Delete(key) } @@ -66,8 +66,8 @@ func (m *connectionMap) Delete(key string) { // // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. -func (m *connectionMap) Range(f func(key string, value connection) bool) { +func (m *ctxWrapperMap) Range(f func(key string, value *ctxWrapper) bool) { (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(connection)) + return f(key.(string), value.(*ctxWrapper)) }) } diff --git a/pkg/networkservice/common/heal/gen.go b/pkg/networkservice/common/heal/gen.go index 2e012277bd..0650adb685 100644 --- a/pkg/networkservice/common/heal/gen.go +++ b/pkg/networkservice/common/heal/gen.go @@ -22,8 +22,10 @@ import ( "sync" ) -//go:generate go-syncmap -output connection_map.gen.go -type connectionMap -//go:generate go-syncmap -output client_connection_map.gen.go -type clientConnMap +//go:generate go-syncmap -output connection_info_map.gen.go -type connectionInfoMap -type connectionMap sync.Map -type clientConnMap sync.Map +type connectionInfoMap sync.Map + +//go:generate go-syncmap -output context_wrapper_map.gen.go -type ctxWrapperMap + +type ctxWrapperMap sync.Map diff --git a/pkg/networkservice/common/heal/server.go b/pkg/networkservice/common/heal/server.go index 03b30f513a..959d0bab18 100644 --- a/pkg/networkservice/common/heal/server.go +++ b/pkg/networkservice/common/heal/server.go @@ -21,16 +21,13 @@ package heal import ( "context" - "runtime" + "sync" "time" - "github.com/edwarnicke/serialize" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" - "google.golang.org/grpc" - "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" @@ -41,47 +38,33 @@ import ( ) type ctxWrapper struct { - ctx context.Context - cancel func() -} - -type clientConnInfo struct { - ctx context.Context - cc networkservice.MonitorConnectionClient -} - -type connection struct { - *networkservice.Connection - ctx context.Context + mut sync.Mutex + request *networkservice.NetworkServiceRequest + ctx context.Context + cancel func() } type healServer struct { - ctx context.Context - clients clientConnMap - onHeal *networkservice.NetworkServiceClient - cancelHealMap map[string]*ctxWrapper - cancelHealMapExecutor serialize.Executor - conns connectionMap + ctx context.Context + onHeal *networkservice.NetworkServiceClient + healContextMap ctxWrapperMap } // NewServer - creates a new networkservice.NetworkServiceServer chain element that implements the healing algorithm -// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. -// - client - networkservice.MonitorConnectionClient that can be used to call MonitorConnection against the endpoint -// - onHeal - *networkservice.NetworkServiceClient. Since networkservice.NetworkServiceClient is an interface -// (and thus a pointer) *networkservice.NetworkServiceClient is a double pointer. Meaning it +// - ctx - context for the lifecycle of the *Server* itself. Cancel when discarding the server. +// - onHeal - client used 'onHeal'. +// If we detect we need to heal, onHeal.Request is used to heal. +// If we can't heal connection, onHeal.Close will be called. +// If onHeal is nil, then we simply set onHeal to this client chain element +// Since networkservice.NetworkServiceClient is an interface (and thus a pointer) +// *networkservice.NetworkServiceClient is a double pointer. Meaning it // points to a place that points to a place that implements networkservice.NetworkServiceClient // This is done because when we use heal.NewClient as part of a chain, we may not *have* // a pointer to this -// client used 'onHeal'. If we detect we need to heal, onHeal.Request is used to heal. -// If onHeal is nil, then we simply set onHeal to this client chain element -// If we are part of a larger chain or a server, we should pass the resulting chain into -// this constructor before we actually have a pointer to it. -// If onHeal nil, onHeal will be pointed to the returned networkservice.NetworkServiceClient func NewServer(ctx context.Context, onHeal *networkservice.NetworkServiceClient) networkservice.NetworkServiceServer { rv := &healServer{ - ctx: ctx, - onHeal: onHeal, - cancelHealMap: make(map[string]*ctxWrapper), + ctx: ctx, + onHeal: onHeal, } if rv.onHeal == nil { @@ -92,42 +75,34 @@ func NewServer(ctx context.Context, onHeal *networkservice.NetworkServiceClient) } func (f *healServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - // Replace path within connection to the healed one - f.replaceConnectionPath(request.GetConnection()) - - ctx = withRegisterClientFunc(ctx, f.RegisterClient) + ctx = withRequestHealConnectionFunc(ctx, f.handleHealConnectionRequest) + ctx = withRequestRestoreConnectionFunc(ctx, f.handleRestoreConnectionRequest) conn, err := next.Server(ctx).Request(ctx, request) if err != nil { return nil, err } - // Check heal client is in the chain and connection was added - if _, ok := f.clients.Load(conn.GetId()); !ok { - log.FromContext(ctx).WithField("healServer", "Request").Errorf("Heal server ignored connection %s: heal client is not active", conn.GetId()) - return conn, nil - } - - ctx = f.applyStoredCandidates(ctx, conn) + cw, loaded := f.healContextMap.LoadOrStore(request.GetConnection().GetId(), &ctxWrapper{ + request: request.Clone().SetRequestConnection(conn.Clone()), + ctx: f.createHealContext(ctx, nil), + }) + if loaded { + cw.mut.Lock() + defer cw.mut.Unlock() - err = f.startHeal(ctx, request.Clone().SetRequestConnection(conn.Clone())) - if err != nil { - return nil, err + if cw.cancel != nil { + cw.cancel() + cw.cancel = nil + } + cw.request = request.Clone().SetRequestConnection(conn.Clone()) + cw.ctx = f.createHealContext(ctx, cw.ctx) } - f.conns.Store(conn.GetId(), connection{ - Connection: conn.Clone(), - ctx: ctx, - }) - return conn, nil } func (f *healServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - // Replace path within connection to the healed one - f.replaceConnectionPath(conn) - f.stopHeal(conn) - f.clients.Delete(conn.Id) rv, err := next.Server(ctx).Close(ctx, conn) if err != nil { @@ -136,224 +111,69 @@ func (f *healServer) Close(ctx context.Context, conn *networkservice.Connection) return rv, nil } -func (f *healServer) RegisterClient(ctx context.Context, conn *networkservice.Connection, cc networkservice.MonitorConnectionClient) { - f.clients.Store(conn.Id, clientConnInfo{ - ctx: ctx, - cc: cc, - }) -} +func (f *healServer) getHealContext(conn *networkservice.Connection) (*networkservice.NetworkServiceRequest, context.Context) { + var healCtx context.Context + var request *networkservice.NetworkServiceRequest -func (f *healServer) stopHeal(conn *networkservice.Connection) { - var cancelHeal func() - <-f.cancelHealMapExecutor.AsyncExec(func() { - if cancelHealEntry, ok := f.cancelHealMap[conn.GetId()]; ok { - cancelHeal = cancelHealEntry.cancel - delete(f.cancelHealMap, conn.GetId()) - } - }) - if cancelHeal != nil { - cancelHeal() + cw, ok := f.healContextMap.Load(conn.GetId()) + if !ok { + return nil, nil } - f.conns.Delete(conn.GetId()) -} - -// startHeal - start a healAsNeeded using the request as the request for re-request if healing is needed. -func (f *healServer) startHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error { - errCh := make(chan error, 1) - go f.healAsNeeded(ctx, request, errCh, opts...) - return <-errCh -} - -// healAsNeeded - heal the connection found in request. Will immediately do a monitor to make sure the server has the -// expected connection and it is sane, returning an error via errCh if there is an issue, and nil via errCh if there is -// not. You will only *ever* receive one real error via the errCh. errCh will be closed when healAsNeeded is finished -// allowing it to double as a 'done' channel we can use when we stopHealing in f.Close. -// healAsNeeded will then continue to monitor the servers opinions about the state of the connection until either -// expireTime has passed or stopHeal is called (as in Close) or a different pathSegment is found via monitoring -// indicating that a later Request has occurred and in doing so created its own healAsNeeded and so we can stop this one -func (f *healServer) healAsNeeded(baseCtx context.Context, request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) { - clockTime := clock.FromContext(baseCtx) - pathSegment := request.GetConnection().GetNextPathSegment() + cw.mut.Lock() + defer cw.mut.Unlock() - // Make sure we have a valid expireTime to work with - expireTime, err := ptypes.Timestamp(pathSegment.GetExpires()) - if err != nil { - errCh <- errors.Wrapf(err, "error converting pathSegment.GetExpires() to time.Time: %+v", pathSegment.GetExpires()) - return + if cw.cancel != nil { + cw.cancel() } + ctx, cancel := context.WithCancel(cw.ctx) + cw.cancel = cancel + healCtx = ctx + request = cw.request - ctx, cancel := f.healContext(baseCtx) - defer cancel() - id := request.GetConnection().GetId() - <-f.cancelHealMapExecutor.AsyncExec(func() { - if entry, ok := f.cancelHealMap[id]; ok { - go entry.cancel() // TODO - what to do with the errCh here? - } - f.cancelHealMap[id] = &ctxWrapper{ - ctx: ctx, - cancel: cancel, - } - }) + return request, healCtx +} - // Monitor the pathSegment for the first time, so we can pass back an error - // if we can't confirm via monitor the other side has the expected state - recv, err := f.initialMonitorSegment(ctx, request.GetConnection(), clockTime.Until(expireTime)) - if err != nil { - errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection()) +// handleHealConnectionRequest - heals requested connection. Returns immediately, heal is asynchronous. +func (f *healServer) handleHealConnectionRequest(conn *networkservice.Connection) { + request, healCtx := f.getHealContext(conn) + if request == nil { return } - // Tell the caller all is well by sending them a nil err so the call can continue - close(errCh) + request.SetRequestConnection(conn.Clone()) - // Start looping over events - for ctx.Err() == nil { - event, err := recv.Recv() - if err != nil { - deadline := clockTime.Now().Add(time.Minute) - if deadline.After(expireTime) { - deadline = expireTime - } - newRecv, newRecvErr := f.initialMonitorSegment(ctx, request.GetConnection(), clockTime.Until(deadline)) - if newRecvErr == nil { - recv = newRecv - } else { - f.restoreConnection(ctx, request, opts...) - return - } - runtime.Gosched() - continue - } - if ctx.Err() != nil { - return - } - if err := f.processEvent(ctx, request, event, opts...); err != nil { - return - } - } + go f.processHeal(healCtx, request) } -func (f *healServer) healContext(baseCtx context.Context) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(f.ctx) - - if candidates := discover.Candidates(baseCtx); candidates != nil { - ctx = discover.WithCandidates(ctx, candidates.Endpoints, candidates.NetworkService) +// handleRestoreConnectionRequest - recreates connection. Returns immediately, heal is asynchronous. +func (f *healServer) handleRestoreConnectionRequest(conn *networkservice.Connection) { + request, healCtx := f.getHealContext(conn) + if request == nil { + return } - return ctx, cancel -} - -// initialMonitorSegment - monitors for pathSegment and returns a recv and an error if the server does not have -// a record for the connection matching our expectations -func (f *healServer) initialMonitorSegment(ctx context.Context, conn *networkservice.Connection, timeout time.Duration) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { - clockTime := clock.FromContext(ctx) - - errCh := make(chan error, 1) - var recv networkservice.MonitorConnection_MonitorConnectionsClient - pathSegment := conn.GetNextPathSegment() - - // nolint:govet - initialCtx, cancel := context.WithCancel(ctx) - - go func() { - // If pathSegment is nil, the server is very very screwed up - if pathSegment == nil { - errCh <- errors.New("pathSegment for server connection must not be nil") - return - } - - // Get connection client by connection - client, ok := f.clients.Load(conn.GetId()) - if !ok { - errCh <- errors.Errorf("error when attempting to MonitorConnections") - return - } + request.SetRequestConnection(conn.Clone()) - // Monitor *just* this connection - var err error - recv, err = client.cc.MonitorConnections(initialCtx, &networkservice.MonitorScopeSelector{ - PathSegments: []*networkservice.PathSegment{ - pathSegment, - }, - }) - if err != nil { - errCh <- errors.Wrap(err, "error when attempting to MonitorConnections") - return - } - - // Get an initial event to make sure we have the expected connection - event, err := recv.Recv() - if err != nil { - errCh <- err - return - } - // If we didn't get an event something very bad has happened - if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { - errCh <- errors.Errorf("connection with id %s not found in MonitorConnections event as expected: event: %+v", pathSegment.Id, event) - return - } - // If its not *our* connection something's gone wrong like a later Request succeeding - if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { - errCh <- errors.Errorf("server reports a different connection for this id, pathSegments do not match. Expected: %+v Received %+v", pathSegment, event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) - return - } - errCh <- nil - }() - - select { - case err := <-errCh: - // nolint:govet - return recv, err - case <-clockTime.After(timeout): - cancel() - err := <-errCh - return recv, err - } + go f.restoreConnection(healCtx, request) } -// processEvent - process event, calling (*f.OnHeal).Request(ctx,request,opts...) if the server does not have our connection. -// returns a non-nil error if the event is such that we should no longer to continue to attempt to heal. -func (f *healServer) processEvent(ctx context.Context, request *networkservice.NetworkServiceRequest, event *networkservice.ConnectionEvent, opts ...grpc.CallOption) error { - pathSegment := request.GetConnection().GetNextPathSegment() - - switch event.GetType() { - case networkservice.ConnectionEventType_UPDATE: - // We should never receive an UPDATE that isn't ours, but in case we do... - if event.Connections == nil || event.Connections[pathSegment.GetId()] == nil { - break - } - fallthrough - case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER: - if event.Connections != nil && event.Connections[pathSegment.GetId()] != nil { - // If the server has a pathSegment for this Connection.Id, but its not the one we - // got back from it... we should fail, as different Request came after ours successfully - if !pathSegment.Equal(event.GetConnections()[pathSegment.GetId()].GetCurrentPathSegment()) { - return errors.Errorf("server has a different pathSegment than was returned to this call.") - } - break - } - fallthrough - case networkservice.ConnectionEventType_DELETE: - if event.Connections != nil && event.Connections[pathSegment.GetId()] != nil { - f.processHeal(ctx, request, opts...) - } +func (f *healServer) stopHeal(conn *networkservice.Connection) { + cw, loaded := f.healContextMap.LoadAndDelete(conn.GetId()) + if !loaded { + return + } + cw.mut.Lock() + defer cw.mut.Unlock() + if cw.cancel != nil { + cw.cancel() } - return nil } func (f *healServer) restoreConnection(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) { clockTime := clock.FromContext(ctx) - id := request.GetConnection().GetId() - - var healMapCtx context.Context - <-f.cancelHealMapExecutor.AsyncExec(func() { - if entry, ok := f.cancelHealMap[id]; ok { - healMapCtx = entry.ctx - } - }) - if healMapCtx != ctx || ctx.Err() != nil { + if ctx.Err() != nil { return } @@ -381,9 +201,7 @@ func (f *healServer) restoreConnection(ctx context.Context, request *networkserv } func (f *healServer) processHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) { - clockTime := clock.FromContext(ctx) logEntry := log.FromContext(ctx).WithField("healServer", "processHeal") - conn := request.GetConnection() candidates := discover.Candidates(ctx) @@ -409,7 +227,7 @@ func (f *healServer) processHeal(ctx context.Context, request *networkservice.Ne } } else { // Huge timeout is not required to close connection on a current path segment - closeCtx, closeCancel := clockTime.WithTimeout(ctx, time.Second) + closeCtx, closeCancel := clock.FromContext(ctx).WithTimeout(ctx, time.Second) defer closeCancel() _, err := (*f.onHeal).Close(closeCtx, request.GetConnection().Clone(), opts...) @@ -419,26 +237,19 @@ func (f *healServer) processHeal(ctx context.Context, request *networkservice.Ne } } -func (f *healServer) replaceConnectionPath(conn *networkservice.Connection) { - path := conn.GetPath() - if path != nil && int(path.Index) < len(path.PathSegments)-1 { - if storedConn, ok := f.conns.Load(conn.GetId()); ok { - path.PathSegments = path.PathSegments[:path.Index+1] - path.PathSegments = append(path.PathSegments, storedConn.Path.PathSegments[path.Index+1:]...) - conn.NetworkServiceEndpointName = storedConn.NetworkServiceEndpointName +// createHealContext - create context to be used on heal. +// Uses f.ctx as base and inserts Candidates from requestCtx or cachedCtx into it, if there are any. +func (f *healServer) createHealContext(requestCtx, cachedCtx context.Context) context.Context { + ctx := requestCtx + if cachedCtx != nil { + if candidates := discover.Candidates(ctx); candidates == nil || len(candidates.Endpoints) > 0 { + ctx = cachedCtx } } -} - -func (f *healServer) applyStoredCandidates(ctx context.Context, conn *networkservice.Connection) context.Context { - if candidates := discover.Candidates(ctx); candidates != nil && len(candidates.Endpoints) > 0 { - return ctx + healCtx := f.ctx + if candidates := discover.Candidates(ctx); candidates != nil { + healCtx = discover.WithCandidates(healCtx, candidates.Endpoints, candidates.NetworkService) } - if info, ok := f.conns.Load(conn.Id); ok { - if candidates := discover.Candidates(info.ctx); candidates != nil { - ctx = discover.WithCandidates(ctx, candidates.Endpoints, candidates.NetworkService) - } - } - return ctx + return healCtx } diff --git a/pkg/networkservice/common/heal/server_test.go b/pkg/networkservice/common/heal/server_test.go index 1deec2bff6..2e5fc1b075 100644 --- a/pkg/networkservice/common/heal/server_test.go +++ b/pkg/networkservice/common/heal/server_test.go @@ -36,7 +36,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/sandbox" ) @@ -111,54 +110,3 @@ func TestHealClient_Request(t *testing.T) { _, err = client.Close(requestCtx, conn) require.NoError(t, err) } - -func TestHealClient_EmptyInit(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - eventCh := make(chan *networkservice.ConnectionEvent, 1) - defer close(eventCh) - - onHealCh := make(chan struct{}) - // TODO for tomorrow... check on how to work onHeal into the new chain I've built - onHeal := &testOnHeal{ - RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { - if ctx.Err() == nil { - close(onHealCh) - } - return &networkservice.Connection{}, nil - }, - } - - eventTrigger := &testOnHeal{ - RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - eventCh <- &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, - Connections: make(map[string]*networkservice.Connection), - } - return next.Client(ctx).Request(ctx, in) - }, - } - - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - - healServer := heal.NewServer(ctx, addressof.NetworkServiceClient(onHeal)) - client := chain.NewNetworkServiceClient( - updatepath.NewClient("testClient"), - adapters.NewServerToClient(healServer), - heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh)), - adapters.NewServerToClient(updatetoken.NewServer(sandbox.GenerateTestToken)), - updatepath.NewClient("testServer"), - eventTrigger, - adapters.NewServerToClient(updatetoken.NewServer(sandbox.GenerateTestToken)), - ) - - requestCtx, reqCancelFunc := context.WithTimeout(ctx, waitForTimeout) - defer reqCancelFunc() - - _, err := client.Request(requestCtx, &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - NetworkService: "ns-1", - }, - }) - require.Error(t, err) -} diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index 959eb8517d..508cb6d30b 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -63,12 +63,7 @@ func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScope m.executor.AsyncExec(func() { monitor := newMonitorFilter(selector, srv) m.monitors = append(m.monitors, monitor) - connections := make(map[string]*networkservice.Connection) - for _, ps := range selector.PathSegments { - if conn, ok := m.connections[ps.GetId()]; ok { - connections[ps.GetId()] = conn - } - } + connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector) // Send initial transfer of all data available _ = monitor.Send(&networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, @@ -84,8 +79,8 @@ func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScope func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { conn, err := next.Server(ctx).Request(ctx, request) - eventConn := conn.Clone() if err == nil { + eventConn := conn.Clone() m.executor.AsyncExec(func() { m.connections[eventConn.GetId()] = eventConn