Skip to content

Commit

Permalink
Rework registry/refresh to clock
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov committed Apr 5, 2021
1 parent fa3f8a4 commit 924985c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 35 deletions.
14 changes: 9 additions & 5 deletions pkg/registry/common/refresh/nse_registry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

Expand Down Expand Up @@ -54,6 +55,7 @@ func NewNetworkServiceEndpointRegistryClient(options ...Option) registry.Network

func (c *refreshNSEClient) startRefresh(
ctx context.Context,
clockTime clock.Clock,
client registry.NetworkServiceEndpointRegistryClient,
nse *registry.NetworkServiceEndpoint,
expiryDuration time.Duration,
Expand All @@ -66,8 +68,8 @@ func (c *refreshNSEClient) startRefresh(
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Until(t) / 3):
nse.ExpirationTime = timestamppb.New(time.Now().Add(expiryDuration))
case <-clockTime.After(2 * clockTime.Until(t) / 3):
nse.ExpirationTime = timestamppb.New(clockTime.Now().Add(expiryDuration))

res, err := client.Register(ctx, nse.Clone())
if err != nil {
Expand All @@ -88,12 +90,14 @@ func (c *refreshNSEClient) Register(
nse *registry.NetworkServiceEndpoint,
opts ...grpc.CallOption,
) (*registry.NetworkServiceEndpoint, error) {
clockTime := clock.FromContext(ctx)

var expiryDuration time.Duration
if nse.ExpirationTime == nil {
expiryDuration = c.defaultExpiryDuration
nse.ExpirationTime = timestamppb.New(time.Now().Add(expiryDuration))
nse.ExpirationTime = timestamppb.New(clockTime.Now().Add(expiryDuration))
} else {
expiryDuration = time.Until(nse.ExpirationTime.AsTime().Local())
expiryDuration = clockTime.Until(nse.ExpirationTime.AsTime().Local())
}

refreshNSE := nse.Clone()
Expand All @@ -114,7 +118,7 @@ func (c *refreshNSEClient) Register(
ctx, cancel := context.WithCancel(c.chainContext)
c.nseCancels.Store(resp.Name, cancel)

c.startRefresh(ctx, nextClient, refreshNSE, expiryDuration)
c.startRefresh(ctx, clockTime, nextClient, refreshNSE, expiryDuration)

return resp, err
}
Expand Down
107 changes: 77 additions & 30 deletions pkg/registry/common/refresh/nse_registry_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/utils/checks/checknse"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"
)

const testExpiryDuration = time.Millisecond * 100
const (
expireTimeout = time.Minute
testWait = 100 * time.Millisecond
testTick = testWait / 100
)

func testNSE() *registry.NetworkServiceEndpoint {
return &registry.NetworkServiceEndpoint{
Expand All @@ -48,22 +54,29 @@ func testNSE() *registry.NetworkServiceEndpoint {
func TestNewNetworkServiceEndpointRegistryClient(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

clockMock := clockmock.NewMock()
ctx := clock.WithClock(context.Background(), clockMock)

countClient := new(requestCountClient)
client := next.NewNetworkServiceEndpointRegistryClient(
refresh.NewNetworkServiceEndpointRegistryClient(refresh.WithDefaultExpiryDuration(testExpiryDuration)),
refresh.NewNetworkServiceEndpointRegistryClient(
refresh.WithChainContext(ctx),
refresh.WithDefaultExpiryDuration(expireTimeout)),
countClient,
)

_, err := client.Register(context.Background(), &registry.NetworkServiceEndpoint{
Name: "nse-1",
})
_, err := client.Register(ctx, testNSE())
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)

clockMock.Add(expireTimeout)
require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > 1
}, time.Second, testExpiryDuration/4)
}, testWait, testTick)

_, err = client.Unregister(context.Background(), testNSE())
_, err = client.Unregister(ctx, testNSE())
require.NoError(t, err)
}

Expand All @@ -87,29 +100,41 @@ func TestRefreshNSEClient_ShouldSetExpirationTime_BeforeCallNext(t *testing.T) {
func Test_RefreshNSEClient_CalledRegisterTwice(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

clockMock := clockmock.NewMock()
ctx := clock.WithClock(context.Background(), clockMock)

countClient := new(requestCountClient)
client := next.NewNetworkServiceEndpointRegistryClient(
refresh.NewNetworkServiceEndpointRegistryClient(refresh.WithDefaultExpiryDuration(testExpiryDuration)),
refresh.NewNetworkServiceEndpointRegistryClient(
refresh.WithChainContext(ctx),
refresh.WithDefaultExpiryDuration(expireTimeout)),
countClient,
)

_, err := client.Register(context.Background(), testNSE())
_, err := client.Register(ctx, testNSE())
require.NoError(t, err)

reg, err := client.Register(context.Background(), testNSE())
reg, err := client.Register(ctx, testNSE())
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)

clockMock.Add(expireTimeout)
require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > 2
}, time.Second, testExpiryDuration/4)
}, testWait, testTick)

_, err = client.Unregister(context.Background(), reg)
_, err = client.Unregister(ctx, reg)
require.NoError(t, err)
}

func Test_RefreshNSEClient_ShouldOverrideNameAndDuration(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

clockMock := clockmock.NewMock()
ctx := clock.WithClock(context.Background(), clockMock)

endpoint := &registry.NetworkServiceEndpoint{
Name: "nse-1",
Url: "url",
Expand All @@ -118,10 +143,12 @@ func Test_RefreshNSEClient_ShouldOverrideNameAndDuration(t *testing.T) {
countClient := new(requestCountClient)
registryServer := &nseRegistryServer{
name: uuid.New().String(),
expiryDuration: testExpiryDuration,
expiryDuration: expireTimeout,
}
client := next.NewNetworkServiceEndpointRegistryClient(
refresh.NewNetworkServiceEndpointRegistryClient(refresh.WithDefaultExpiryDuration(time.Hour)),
refresh.NewNetworkServiceEndpointRegistryClient(
refresh.WithChainContext(ctx),
refresh.WithDefaultExpiryDuration(10*expireTimeout)),
checknse.NewClient(t, func(t *testing.T, nse *registry.NetworkServiceEndpoint) {
if atomic.LoadInt32(&countClient.requestCount) > 0 {
require.Equal(t, registryServer.name, nse.Name)
Expand All @@ -132,43 +159,63 @@ func Test_RefreshNSEClient_ShouldOverrideNameAndDuration(t *testing.T) {
adapters.NetworkServiceEndpointServerToClient(registryServer),
)

reg, err := client.Register(context.Background(), endpoint.Clone())
reg, err := client.Register(ctx, endpoint.Clone())
require.NoError(t, err)

require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > 3
}, time.Second, testExpiryDuration/4)
// Wait for the Refresh goroutine to start
time.Sleep(testTick)

for i := 1; i <= 3; i++ {
count := int32(i)

clockMock.Add(expireTimeout)
require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > count
}, testWait, testTick)
}

reg.Url = endpoint.Url

_, err = client.Unregister(context.Background(), reg)
_, err = client.Unregister(ctx, reg)
require.NoError(t, err)
}

func Test_RefreshNSEClient_SetsCorrectExpireTime(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

const expiryDuration = 100 * time.Millisecond
const timeoutDuration = 200 * time.Millisecond
clockMock := clockmock.NewMock()
ctx := clock.WithClock(context.Background(), clockMock)

countClient := new(requestCountClient)
client := next.NewNetworkServiceEndpointRegistryClient(
refresh.NewNetworkServiceEndpointRegistryClient(refresh.WithDefaultExpiryDuration(expiryDuration)),
refresh.NewNetworkServiceEndpointRegistryClient(
refresh.WithChainContext(ctx),
refresh.WithDefaultExpiryDuration(expireTimeout)),
checknse.NewClient(t, func(t *testing.T, nse *registry.NetworkServiceEndpoint) {
require.Greater(t, int64(expiryDuration+timeoutDuration)/2, int64(time.Until(nse.ExpirationTime.AsTime().Local())))
nse.ExpirationTime = timestamppb.New(time.Now().Add(timeoutDuration))
require.Equal(t, expireTimeout, clockMock.Until(nse.ExpirationTime.AsTime().Local()))
nse.ExpirationTime = timestamppb.New(clockMock.Now().Add(10 * expireTimeout))
}),
countClient,
)

reg, err := client.Register(context.Background(), testNSE())
reg, err := client.Register(ctx, testNSE())
require.NoError(t, err)

require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > 3
}, time.Second, testExpiryDuration/4)
// Wait for the Refresh goroutine to start
time.Sleep(testTick)

_, err = client.Unregister(context.Background(), reg)
for i := 1; i <= 3; i++ {
count := int32(i)

clockMock.Add(10 * expireTimeout)
require.Eventually(t, func() bool {
return atomic.LoadInt32(&countClient.requestCount) > count
}, testWait, testTick)
}

reg.ExpirationTime = timestamppb.New(clockMock.Now().Add(expireTimeout))

_, err = client.Unregister(ctx, reg)
require.Nil(t, err)
}

Expand Down Expand Up @@ -199,7 +246,7 @@ func (s *nseRegistryServer) Register(ctx context.Context, nse *registry.NetworkS
nse = nse.Clone()
nse.Name = s.name
nse.Url = uuid.New().String()
nse.ExpirationTime = timestamppb.New(time.Now().Add(s.expiryDuration))
nse.ExpirationTime = timestamppb.New(clock.FromContext(ctx).Now().Add(s.expiryDuration))

return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
}
Expand Down

0 comments on commit 924985c

Please sign in to comment.