Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Mixaster995 committed Apr 22, 2021
2 parents 7b2bd62 + 19c7a10 commit 3860142
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 387 deletions.
136 changes: 124 additions & 12 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,153 @@ package heal

import (
"context"
"sync"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

// RegisterClientFunc - required to inform heal server about new client connection and assign it to the connection ID
type RegisterClientFunc func(context.Context, *networkservice.Connection, networkservice.MonitorConnectionClient)
type connectionInfo struct {
mut sync.Mutex
conn *networkservice.Connection
active bool
}

type healClient struct {
ctx context.Context
cc networkservice.MonitorConnectionClient
ctx context.Context
cc networkservice.MonitorConnectionClient
initOnce sync.Once
initErr error
conns connectionInfoMap
}

// NewClient - creates a new networkservice.NetworkServiceClient chain element that inform healServer about new client connection
// NewClient - creates a new networkservice.NetworkServiceClient chain element that monitors its connections' state
// and calls heal server in case connection breaks if heal server is present in the chain
// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client.
// - cc - MonitorConnectionClient that will be used to watch for connection confirmations and breakages.
func NewClient(ctx context.Context, cc networkservice.MonitorConnectionClient) networkservice.NetworkServiceClient {
return &healClient{
ctx: ctx,
cc: cc,
}
}

func (u *healClient) init(ctx context.Context, conn *networkservice.Connection) error {
monitorClient, err := u.cc.MonitorConnections(u.ctx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{Name: conn.GetCurrentPathSegment().Name}, {Name: ""}},
})
if err != nil {
return errors.Wrap(err, "MonitorConnections failed")
}

healConnection := requestHealConnectionFunc(ctx)
if healConnection == nil {
healConnection = func(conn *networkservice.Connection) {}
}
restoreConnection := requestRestoreConnectionFunc(ctx)
if restoreConnection == nil {
restoreConnection = func(conn *networkservice.Connection) {}
}

go u.listenToConnectionChanges(healConnection, restoreConnection, monitorClient)

return nil
}

func (u *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
conn := request.GetConnection()
u.initOnce.Do(func() {
u.initErr = u.init(ctx, conn)
})
// if initialization failed, then we want for all subsequent calls to Request() on this object to also fail
if u.initErr != nil {
return nil, u.initErr
}

registerClient := registerClientFunc(ctx)
if err == nil && registerClient != nil {
registerClient(u.ctx, conn, u.cc)
connInfo, loaded := u.conns.LoadOrStore(conn.GetId(), &connectionInfo{
conn: conn.Clone(),
mut: sync.Mutex{},
})
u.replaceConnectionPath(conn, connInfo)

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
if !loaded {
u.conns.Delete(request.GetConnection().GetId())
}
return nil, err
}
return conn, err

connInfo.mut.Lock()
defer connInfo.mut.Unlock()
connInfo.conn = conn.Clone()

return conn, nil
}

func (u *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
connInfo, loaded := u.conns.LoadAndDelete(conn.GetId())
if !loaded {
return &empty.Empty{}, nil
}
u.replaceConnectionPath(conn, connInfo)

return next.Client(ctx).Close(ctx, conn, opts...)
}

// listenToConnectionChanges - loops on events from MonitorConnectionClient while the monitor client is alive.
// Updates connection cache and broadcasts events of successful connections.
// Calls heal when something breaks.
func (u *healClient) listenToConnectionChanges(healConnection, restoreConnection requestHealFuncType, monitorClient networkservice.MonitorConnection_MonitorConnectionsClient) {
for {
event, err := monitorClient.Recv()
if err != nil {
u.conns.Range(func(id string, info *connectionInfo) bool {
info.mut.Lock()
defer info.mut.Unlock()
restoreConnection(info.conn)
return true
})
return
}

for _, eventConn := range event.GetConnections() {
connID := eventConn.GetPrevPathSegment().GetId()
connInfo, ok := u.conns.Load(connID)
if !ok {
continue
}
connInfo.mut.Lock()
switch event.GetType() {
// Why both INITIAL_STATE_TRANSFER and UPDATE:
// Sometimes we start polling events too late, and when we wait for confirmation of success of some connection,
// this connection is in the INITIAL_STATE_TRANSFER event, so we must treat these events the same as UPDATE.
case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, networkservice.ConnectionEventType_UPDATE:
connInfo.active = true
connInfo.conn.Path.PathSegments = eventConn.Clone().Path.PathSegments
connInfo.conn.NetworkServiceEndpointName = eventConn.NetworkServiceEndpointName
case networkservice.ConnectionEventType_DELETE:
if connInfo.active {
healConnection(connInfo.conn)
}
connInfo.active = false
}
connInfo.mut.Unlock()
}
}
}

func (u *healClient) replaceConnectionPath(conn *networkservice.Connection, connInfo *connectionInfo) {
path := conn.GetPath()
if path != nil && int(path.Index) < len(path.PathSegments)-1 {
connInfo.mut.Lock()
defer connInfo.mut.Unlock()
path.PathSegments = path.PathSegments[:path.Index+1]
path.PathSegments = append(path.PathSegments, connInfo.conn.Path.PathSegments[path.Index+1:]...)
conn.NetworkServiceEndpointName = connInfo.conn.NetworkServiceEndpointName
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 24 additions & 5 deletions pkg/networkservice/common/heal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,42 @@ package heal

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
)

const (
registerClientFuncKey contextKeyType = "RegisterFunc"
requestHealConnectionFuncKey contextKeyType = "requestHealConnectionFuncKey"
requestRestoreConnectionFuncKey contextKeyType = "requestRestoreConnectionFuncKey"
)

type contextKeyType string

func withRegisterClientFunc(parent context.Context, registerClientFunc RegisterClientFunc) context.Context {
type requestHealFuncType func(conn *networkservice.Connection)

func withRequestHealConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context {
if parent == nil {
panic("cannot create context from nil parent")
}
return context.WithValue(parent, requestHealConnectionFuncKey, fun)
}

func requestHealConnectionFunc(ctx context.Context) requestHealFuncType {
if rv, ok := ctx.Value(requestHealConnectionFuncKey).(requestHealFuncType); ok {
return rv
}
return nil
}

func withRequestRestoreConnectionFunc(parent context.Context, fun requestHealFuncType) context.Context {
if parent == nil {
panic("cannot create context from nil parent")
}
return context.WithValue(parent, registerClientFuncKey, registerClientFunc)
return context.WithValue(parent, requestRestoreConnectionFuncKey, fun)
}

func registerClientFunc(ctx context.Context) RegisterClientFunc {
if rv, ok := ctx.Value(registerClientFuncKey).(RegisterClientFunc); ok {
func requestRestoreConnectionFunc(ctx context.Context) requestHealFuncType {
if rv, ok := ctx.Value(requestRestoreConnectionFuncKey).(requestHealFuncType); ok {
return rv
}
return nil
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/networkservice/common/heal/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"sync"
)

//go:generate go-syncmap -output connection_map.gen.go -type connectionMap<string,connection>
//go:generate go-syncmap -output client_connection_map.gen.go -type clientConnMap<string,clientConnInfo>
//go:generate go-syncmap -output connection_info_map.gen.go -type connectionInfoMap<string,*connectionInfo>

type connectionMap sync.Map
type clientConnMap sync.Map
type connectionInfoMap sync.Map

//go:generate go-syncmap -output context_wrapper_map.gen.go -type ctxWrapperMap<string,*ctxWrapper>

type ctxWrapperMap sync.Map
Loading

0 comments on commit 3860142

Please sign in to comment.