Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk#994] Heal deadline #995

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,9 @@ func TestNSMGR_CloseHeal(t *testing.T) {
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)

// 3. Stop endpoint
// 3. Stop endpoint and wait for the heal to start
nseCtxCancel()
time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add sleep here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test validates that heal can be stopped after it has been started - so we need to wait for some time to make sure that heal is really running at the moment when we are trying to stop it.
Without this sleep test is also working, but it tests preventive heal stop - and so it is not the thing that should be tested here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some other way we can make sure that heal has started?
Recently we found out that on CI 100ms sometimes isn't enough for thread synchronization. And the test where we had issues with timeout is much simpler, so complex heal tests likely require even more time to be sure that another thread started working.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be problematic.
Waiting for 100ms doesn't guarantee that heal has been started, but it does mean that in most test runs it will be so.


// 4. Close connection
_, _ = nsc.Close(nscCtx, conn.Clone())
Expand Down
6 changes: 5 additions & 1 deletion pkg/networkservice/chains/nsmgr/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,18 @@ func (s *nsmgrSuite) Test_ConnectToDeadNSEUsecase() {
require.Equal(t, 5, len(conn.Path.PathSegments))

killNse()
// Simulate refresh from client.

// Simulate refresh from client
refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

_, err = nsc.Request(ctx, refreshRequest)
require.Error(t, err)
require.NoError(t, ctx.Err())

// Close
_, _ = nsc.Close(ctx, conn)

// Endpoint unregister
_, err = s.domain.Nodes[0].EndpointRegistryClient.Unregister(ctx, nseReg)
require.NoError(t, err)
Expand Down
30 changes: 0 additions & 30 deletions pkg/networkservice/common/heal/constants_test.go

This file was deleted.

27 changes: 0 additions & 27 deletions pkg/networkservice/common/heal/constants_windows_test.go

This file was deleted.

101 changes: 63 additions & 38 deletions pkg/networkservice/common/heal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

Expand All @@ -39,10 +38,11 @@ import (
)

type ctxWrapper struct {
mut sync.Mutex
request *networkservice.NetworkServiceRequest
ctx context.Context
cancel func()
mut sync.Mutex
request *networkservice.NetworkServiceRequest
requestTimeout time.Duration
ctx context.Context
cancel func()
}

type healServer struct {
Expand Down Expand Up @@ -78,16 +78,26 @@ func NewServer(ctx context.Context, opts ...Option) networkservice.NetworkServic
}

func (f *healServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
clockTime := clock.FromContext(ctx)
ctx = f.withHandlers(ctx)

requestStart := clockTime.Now()

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

// There can possible be a case when we are trying to heal from the local case to the remote case. Maximum captured
// difference between these times was 3x on packet cluster (0.5s local vs 1.5s remote). So taking 5x value would be
// enough to cover such local to remote case and not too much in terms of blocking subsequent Request/Close events
// (7.5s for the worst remote case).
requestTimeout := clockTime.Since(requestStart) * 5

cw, loaded := f.healContextMap.LoadOrStore(request.GetConnection().GetId(), &ctxWrapper{
request: request.Clone(),
ctx: f.createHealContext(ctx, nil),
request: request.Clone(),
requestTimeout: requestTimeout,
ctx: f.createHealContext(ctx, nil),
})
if loaded {
cw.mut.Lock()
Expand All @@ -98,6 +108,7 @@ func (f *healServer) Request(ctx context.Context, request *networkservice.Networ
cw.cancel = nil
}
cw.request = request.Clone()
cw.requestTimeout = requestTimeout
cw.ctx = f.createHealContext(ctx, cw.ctx)
}

Expand Down Expand Up @@ -129,10 +140,12 @@ func (f *healServer) Close(ctx context.Context, conn *networkservice.Connection)
return rv, err
}

func (f *healServer) getHealContext(conn *networkservice.Connection) (*networkservice.NetworkServiceRequest, context.Context) {
func (f *healServer) getHealContext(
conn *networkservice.Connection,
) (context.Context, *networkservice.NetworkServiceRequest, time.Duration) {
cw, ok := f.healContextMap.Load(conn.GetId())
if !ok {
return nil, nil
return nil, nil, 0
}

cw.mut.Lock()
Expand All @@ -143,48 +156,53 @@ func (f *healServer) getHealContext(conn *networkservice.Connection) (*networkse
}
ctx, cancel := context.WithCancel(cw.ctx)
cw.cancel = cancel
request := cw.request.Clone()

return request, ctx
return ctx, cw.request.Clone(), cw.requestTimeout
}

// handleHealConnectionRequest - heals requested connection. Returns immediately, heal is asynchronous.
func (f *healServer) handleHealConnectionRequest(conn *networkservice.Connection) {
request, healCtx := f.getHealContext(conn)
ctx, request, requestTimeout := f.getHealContext(conn)
if request == nil {
return
}

request.SetRequestConnection(conn.Clone())

go f.processHeal(healCtx, request)
go f.processHeal(ctx, request, requestTimeout)
}

// handleRestoreConnectionRequest - recreates connection. Returns immediately, heal is asynchronous.
func (f *healServer) handleRestoreConnectionRequest(conn *networkservice.Connection) {
request, healCtx := f.getHealContext(conn)
ctx, request, requestTimeout := f.getHealContext(conn)
if request == nil {
return
}

request.SetRequestConnection(conn.Clone())

go f.restoreConnection(healCtx, request)
go f.restoreConnection(ctx, request, requestTimeout)
}

func (f *healServer) stopHeal(conn *networkservice.Connection) {
cw, loaded := f.healContextMap.LoadAndDelete(conn.GetId())
if !loaded {
return
}

cw.mut.Lock()
defer cw.mut.Unlock()

if cw.cancel != nil {
cw.cancel()
}
}

func (f *healServer) restoreConnection(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) {
func (f *healServer) restoreConnection(
ctx context.Context,
request *networkservice.NetworkServiceRequest,
requestTimeout time.Duration,
) {
clockTime := clock.FromContext(ctx)

if ctx.Err() != nil {
Expand All @@ -202,55 +220,62 @@ func (f *healServer) restoreConnection(ctx context.Context, request *networkserv
if deadline.After(expireTime) {
deadline = expireTime
}
requestCtx, requestCancel := clockTime.WithDeadline(ctx, deadline)
defer requestCancel()
restoreCtx, restoreCancel := clockTime.WithDeadline(ctx, deadline)
defer restoreCancel()

for requestCtx.Err() == nil {
if _, err = (*f.onHeal).Request(requestCtx, request.Clone(), opts...); err == nil {
for restoreCtx.Err() == nil {
requestCtx, requestCancel := clockTime.WithTimeout(restoreCtx, requestTimeout)
_, err := (*f.onHeal).Request(requestCtx, request.Clone())
requestCancel()

if err == nil {
return
}
}

f.processHeal(ctx, request.Clone(), opts...)
f.processHeal(ctx, request, requestTimeout)
}

func (f *healServer) processHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) {
logEntry := log.FromContext(ctx).WithField("healServer", "processHeal")
conn := request.GetConnection()
func (f *healServer) processHeal(
ctx context.Context,
request *networkservice.NetworkServiceRequest,
requestTimeout time.Duration,
) {
clockTime := clock.FromContext(ctx)
logger := log.FromContext(ctx).WithField("healServer", "processHeal")

if ctx.Err() != nil {
return
}

candidates := discover.Candidates(ctx)
conn := request.GetConnection()
if candidates != nil || conn.GetPath().GetIndex() == 0 {
logEntry.Infof("Starting heal process for %s", conn.GetId())

healCtx, healCancel := context.WithCancel(ctx)
defer healCancel()
logger.Infof("Starting heal process for %s", conn.GetId())

reRequest := request.Clone()
reRequest.GetConnection().NetworkServiceEndpointName = ""
path := reRequest.GetConnection().Path
reRequest.GetConnection().Path.PathSegments = path.PathSegments[0 : path.Index+1]
conn.NetworkServiceEndpointName = ""
conn.Path.PathSegments = conn.Path.PathSegments[0 : conn.Path.Index+1]

for ctx.Err() == nil {
_, err := (*f.onHeal).Request(healCtx, reRequest, opts...)
requestCtx, requestCancel := clockTime.WithTimeout(ctx, requestTimeout)
_, err := (*f.onHeal).Request(requestCtx, request.Clone())
requestCancel()

if err != nil {
logEntry.Errorf("Failed to heal connection %s: %v", conn.GetId(), err)
logger.Errorf("Failed to heal connection %s: %v", conn.GetId(), err)
} else {
logEntry.Infof("Finished heal process for %s", conn.GetId())
logger.Infof("Finished heal process for %s", conn.GetId())
break
}
}
} else {
// Huge timeout is not required to close connection on a current path segment
closeCtx, closeCancel := clock.FromContext(ctx).WithTimeout(ctx, time.Second)
closeCtx, closeCancel := clockTime.WithTimeout(ctx, time.Second)
defer closeCancel()

_, err := (*f.onHeal).Close(closeCtx, request.GetConnection().Clone(), opts...)
_, err := (*f.onHeal).Close(closeCtx, conn)
if err != nil {
logEntry.Errorf("Failed to close connection %s: %v", request.GetConnection().GetId(), err)
logger.Errorf("Failed to close connection %s: %v", conn.GetId(), err)
}
}
}
Expand Down
Loading