Skip to content

Commit

Permalink
Enable use of multiple point2pointipam chain elements (networkservice…
Browse files Browse the repository at this point in the history
…mesh#912)

Signed-off-by: Mikhail Avramenko <avramenkomihail15@gmail.com>
  • Loading branch information
edwarnicke authored and Mixaster995 committed May 17, 2021
1 parent 3faa4c3 commit 34ca3fd
Show file tree
Hide file tree
Showing 23 changed files with 1,005 additions and 120 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/nats-io/nats-streaming-server v0.17.0
github.com/nats-io/stan.go v0.6.0
github.com/networkservicemesh/api v0.0.0-20210502014900-961930476ee1
github.com/networkservicemesh/api v0.0.0-20210509180413-5753c9f30588
github.com/open-policy-agent/opa v0.16.1
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.6.0 h1:26IJPeykh88d8KVLT4jJCIxCyUBOC5/IQup8oWD/QYY=
github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
github.com/networkservicemesh/api v0.0.0-20210502014900-961930476ee1 h1:HNm4SS79Bx14cAXoLkIs/GB2KSJI+zT19ttDIu6Ofuc=
github.com/networkservicemesh/api v0.0.0-20210502014900-961930476ee1/go.mod h1:B6meq/SWjWR6bGXZdXPfbOeaBK+T1JayLdtEJQCsXKU=
github.com/networkservicemesh/api v0.0.0-20210509180413-5753c9f30588 h1:DZZpus9B5AU8nADoDz4Pne6yRqOBV1/eToenIg+djM8=
github.com/networkservicemesh/api v0.0.0-20210509180413-5753c9f30588/go.mod h1:B6meq/SWjWR6bGXZdXPfbOeaBK+T1JayLdtEJQCsXKU=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
Expand Down
7 changes: 5 additions & 2 deletions pkg/networkservice/common/discover/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,11 @@ func TestMatchExactEndpoint(t *testing.T) {
func TestMatchSelectedNSESecondAttempt(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

clockMock := clockmock.New()
ctx := clock.WithClock(context.Background(), clockMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

const requestTimeout = time.Second
const tick = requestTimeout / 10
Expand Down
7 changes: 5 additions & 2 deletions pkg/networkservice/common/journal/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ func TestConnect(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

clockMock := clockmock.New()
ctx := clock.WithClock(context.Background(), clockMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

ts := time.Now()
clockMock.Set(ts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package point2pointipam
package onidle

import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"time"
)

type keyType struct{}

func storeConnInfo(ctx context.Context, connInfo *connectionInfo) {
metadata.Map(ctx, false).Store(keyType{}, connInfo)
}
// Option is an option for the connect server
type Option func(t *onIdleServer)

func loadConnInfo(ctx context.Context) (*connectionInfo, bool) {
if raw, ok := metadata.Map(ctx, false).Load(keyType{}); ok {
return raw.(*connectionInfo), true
// WithTimeout sets timeout for onIdleServer
func WithTimeout(timeout time.Duration) Option {
return func(t *onIdleServer) {
t.timeout = timeout
}
return nil, false
}
133 changes: 133 additions & 0 deletions pkg/networkservice/common/onidle/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package onidle provides server chain element that executes a callback when there were no active connections for specified time
package onidle

import (
"context"
"sync"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
)

type onIdleServer struct {
ctx context.Context
timeout time.Duration
notify func()
timer clock.Timer
timerMut sync.Mutex
timerFired bool
activeConns map[string]struct{}
}

// NewServer returns a new server chain element that notifies about long time periods without active connections.
//
// If timeout passes, server calls specified notify function and all further Requests will fail.
//
// If ctx is canceled before timeout, the server stops monitoring connections without calling notify.
// Further calls to Request will not be affected by this.
func NewServer(ctx context.Context, notify func(), options ...Option) networkservice.NetworkServiceServer {
clockTime := clock.FromContext(ctx)

t := &onIdleServer{
ctx: ctx,
timeout: time.Minute * 10,
notify: notify,
activeConns: make(map[string]struct{}),
}

for _, opt := range options {
opt(t)
}

t.timer = clockTime.AfterFunc(t.timeout, func() {
if ctx.Err() != nil {
return
}

t.timerMut.Lock()

if t.timerFired || len(t.activeConns) != 0 {
t.timerMut.Unlock()
return
}

t.timerFired = true
t.timerMut.Unlock()
t.notify()
})

go func() {
<-t.ctx.Done()
t.timer.Stop()
}()

return t
}

func (t *onIdleServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
isRefresh, expired := t.addConnection(request.GetConnection())

if expired {
return nil, errors.New("endpoint expired")
}

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil && !isRefresh {
t.removeConnection(request.GetConnection())
}

return conn, err
}

func (t *onIdleServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
t.removeConnection(conn)
return next.Server(ctx).Close(ctx, conn)
}

func (t *onIdleServer) addConnection(conn *networkservice.Connection) (isRefresh, expired bool) {
t.timerMut.Lock()
defer t.timerMut.Unlock()

if t.timerFired {
return false, true
}

if _, isRefresh = t.activeConns[conn.GetId()]; !isRefresh {
t.activeConns[conn.GetId()] = struct{}{}
t.timer.Stop()
}
return isRefresh, false
}

func (t *onIdleServer) removeConnection(conn *networkservice.Connection) {
t.timerMut.Lock()
defer t.timerMut.Unlock()

if _, loaded := t.activeConns[conn.GetId()]; loaded {
delete(t.activeConns, conn.GetId())
if len(t.activeConns) == 0 && t.ctx.Err() == nil {
t.timer.Reset(t.timeout)
}
}
}
Loading

0 comments on commit 34ca3fd

Please sign in to comment.