Skip to content

Commit

Permalink
Remove time.Sleep from clocked registry tests
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov committed Apr 6, 2021
1 parent 924985c commit 06d9cc9
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 29 deletions.
89 changes: 79 additions & 10 deletions pkg/registry/common/expire/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"context"
"fmt"
"io"
"sync"
"testing"
"time"

"go.uber.org/goleak"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/networkservicemesh/api/pkg/api/registry"
Expand Down Expand Up @@ -56,10 +58,15 @@ func TestExpireNSServer_NSE_Expired(t *testing.T) {
nseMem := memory.NewNetworkServiceEndpointRegistryServer()
nsMem := memory.NewNetworkServiceRegistryServer()

updateServer := new(updateNSEServer)

s := next.NewNetworkServiceRegistryServer(
expire.NewNetworkServiceServer(
ctx,
adapters.NetworkServiceEndpointServerToClient(nseMem)),
adapters.NetworkServiceEndpointServerToClient(next.NewNetworkServiceEndpointRegistryServer(
updateServer,
nseMem,
))),
nsMem,
)

Expand All @@ -68,17 +75,26 @@ func TestExpireNSServer_NSE_Expired(t *testing.T) {
})
require.NoError(t, err)

for i := 0; i < 10; i++ {
names := make([]string, 10)
for i := 0; i < len(names); i++ {
names[i] = fmt.Sprint("nse-", i)
_, err = nseMem.Register(ctx, &registry.NetworkServiceEndpoint{
Name: fmt.Sprint("nse-", i),
Name: names[i],
NetworkServiceNames: []string{nsName},
ExpirationTime: timestamppb.New(clockMock.Now().Add(expireTimeout)),
})
require.NoError(t, err)
}

// Wait for the update from nseMem
time.Sleep(testWait)
require.Eventually(t, func() bool {
for _, name := range names {
if _, ok := updateServer.updates.Load(name); !ok {
return false
}
}
return true
}, testWait, testTick)

c := adapters.NetworkServiceServerToClient(nsMem)

Expand Down Expand Up @@ -115,10 +131,15 @@ func TestExpireNSServer_NSE_Unregistered(t *testing.T) {
nseMem := memory.NewNetworkServiceEndpointRegistryServer()
nsMem := memory.NewNetworkServiceRegistryServer()

updateServer := new(updateNSEServer)

s := next.NewNetworkServiceRegistryServer(
expire.NewNetworkServiceServer(
ctx,
adapters.NetworkServiceEndpointServerToClient(nseMem)),
adapters.NetworkServiceEndpointServerToClient(next.NewNetworkServiceEndpointRegistryServer(
updateServer,
nseMem,
))),
nsMem,
)

Expand All @@ -127,17 +148,26 @@ func TestExpireNSServer_NSE_Unregistered(t *testing.T) {
})
require.NoError(t, err)

for i := 0; i < 10; i++ {
names := make([]string, 10)
for i := 0; i < len(names); i++ {
names[i] = fmt.Sprint("nse-", i)
_, err = nseMem.Register(ctx, &registry.NetworkServiceEndpoint{
Name: fmt.Sprint("nse-", i),
Name: names[i],
NetworkServiceNames: []string{nsName},
ExpirationTime: timestamppb.New(clockMock.Now().Add(expireTimeout)),
})
require.NoError(t, err)
}

// Wait for the update from nseMem
time.Sleep(testWait)
require.Eventually(t, func() bool {
for _, name := range names {
if _, ok := updateServer.updates.Load(name); !ok {
return false
}
}
return true
}, testWait, testTick)

c := adapters.NetworkServiceServerToClient(nsMem)

Expand Down Expand Up @@ -181,10 +211,15 @@ func TestExpireNSServer_NSE_Update(t *testing.T) {
nseMem := memory.NewNetworkServiceEndpointRegistryServer()
nsMem := memory.NewNetworkServiceRegistryServer()

updateServer := new(updateNSEServer)

s := next.NewNetworkServiceRegistryServer(
expire.NewNetworkServiceServer(
ctx,
adapters.NetworkServiceEndpointServerToClient(nseMem)),
adapters.NetworkServiceEndpointServerToClient(next.NewNetworkServiceEndpointRegistryServer(
updateServer,
nseMem,
))),
nsMem,
)

Expand All @@ -194,6 +229,8 @@ func TestExpireNSServer_NSE_Update(t *testing.T) {
require.NoError(t, err)

for i := 0; i < 3; i++ {
updateServer.updates = sync.Map{}

_, err = nseMem.Register(ctx, &registry.NetworkServiceEndpoint{
Name: nseName,
NetworkServiceNames: []string{nsName},
Expand All @@ -202,7 +239,10 @@ func TestExpireNSServer_NSE_Update(t *testing.T) {
require.NoError(t, err)

// Wait for the update from nseMem
time.Sleep(testWait)
require.Eventually(t, func() bool {
_, ok := updateServer.updates.Load(nseName)
return ok
}, testWait, testTick)

c := adapters.NetworkServiceServerToClient(nsMem)

Expand All @@ -218,3 +258,32 @@ func TestExpireNSServer_NSE_Update(t *testing.T) {
clockMock.Add(expireTimeout / 2)
}
}

type updateNSEServer struct {
updates sync.Map
}

func (s *updateNSEServer) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
}

func (s *updateNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, &updateNSEFindServer{
updateNSEServer: s,
NetworkServiceEndpointRegistry_FindServer: server,
})
}

func (s *updateNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*emptypb.Empty, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
}

type updateNSEFindServer struct {
*updateNSEServer
registry.NetworkServiceEndpointRegistry_FindServer
}

func (s *updateNSEFindServer) Send(nse *registry.NetworkServiceEndpoint) error {
s.updates.Store(nse.Name, struct{}{})
return s.NetworkServiceEndpointRegistry_FindServer.Send(nse)
}
38 changes: 24 additions & 14 deletions pkg/registry/common/expire/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package expire_test
import (
"context"
"io"
"sync/atomic"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/networkservicemesh/api/pkg/api/registry"
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestExpireNSEServer_RefreshKeepsNoUnregister(t *testing.T) {
clockMock := clockmock.NewMock()
ctx = clock.WithClock(ctx, clockMock)

mem := memory.NewNetworkServiceEndpointRegistryServer()
unregisterServer := new(unregisterNSEServer)

c := next.NewNetworkServiceEndpointRegistryClient(
refresh.NewNetworkServiceEndpointRegistryClient(refresh.WithChainContext(ctx)),
Expand All @@ -248,7 +249,7 @@ func TestExpireNSEServer_RefreshKeepsNoUnregister(t *testing.T) {
clockMock.Add(expireTimeout / 2)
}),
expire.NewNetworkServiceEndpointRegistryServer(ctx, 10*expireTimeout),
mem,
unregisterServer,
)),
)

Expand All @@ -257,19 +258,11 @@ func TestExpireNSEServer_RefreshKeepsNoUnregister(t *testing.T) {
})
require.NoError(t, err)

stream, err := adapters.NetworkServiceEndpointServerToClient(mem).Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: new(registry.NetworkServiceEndpoint),
Watch: true,
})
require.NoError(t, err)

for i := 0; i < 3; i++ {
clockMock.Add(expireTimeout / 10 * 9)
time.Sleep(testWait)

nse, err := stream.Recv()
require.NoError(t, err)
require.NotEqual(t, int64(-1), nse.ExpirationTime.Seconds)
require.Never(t, func() bool {
return atomic.LoadInt32(&unregisterServer.unregisterCount) > 0
}, testWait, testTick)
}
}

Expand Down Expand Up @@ -320,3 +313,20 @@ func (s *failureNSEServer) Find(query *registry.NetworkServiceEndpointQuery, ser
func (s *failureNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
}

type unregisterNSEServer struct {
unregisterCount int32
}

func (s *unregisterNSEServer) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
}

func (s *unregisterNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *unregisterNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*emptypb.Empty, error) {
atomic.AddInt32(&s.unregisterCount, 1)
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
}
1 change: 0 additions & 1 deletion pkg/registry/common/querycache/nse_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) {

// 4. Wait for the expire to happen
clockMock.Add(expireTimeout)
time.Sleep(testWait)

_, err = c.Find(ctx, testNSEQuery(name))
require.Errorf(t, err, "find error")
Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/common/refresh/nse_registry_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestNewNetworkServiceEndpointRegistryClient(t *testing.T) {
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)
require.Eventually(t, clockMock.IsTimerSet, testWait, testTick)

clockMock.Add(expireTimeout)
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -118,7 +118,7 @@ func Test_RefreshNSEClient_CalledRegisterTwice(t *testing.T) {
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)
require.Eventually(t, clockMock.IsTimerSet, testWait, testTick)

clockMock.Add(expireTimeout)
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -163,7 +163,7 @@ func Test_RefreshNSEClient_ShouldOverrideNameAndDuration(t *testing.T) {
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)
require.Eventually(t, clockMock.IsTimerSet, testWait, testTick)

for i := 1; i <= 3; i++ {
count := int32(i)
Expand Down Expand Up @@ -202,7 +202,7 @@ func Test_RefreshNSEClient_SetsCorrectExpireTime(t *testing.T) {
require.NoError(t, err)

// Wait for the Refresh goroutine to start
time.Sleep(testTick)
require.Eventually(t, clockMock.IsTimerSet, testWait, testTick)

for i := 1; i <= 3; i++ {
count := int32(i)
Expand Down
13 changes: 13 additions & 0 deletions pkg/tools/clockmock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package clockmock
import (
"context"
"sync"
"sync/atomic"
"time"

libclock "github.com/benbjohnson/clock"
Expand All @@ -33,6 +34,7 @@ var _ clock.Clock = (*Mock)(nil)
type Mock struct {
lock sync.RWMutex
mock *libclock.Mock
flag int32
}

// NewMock returns a new mocked clock
Expand All @@ -42,6 +44,11 @@ func NewMock() *Mock {
}
}

// IsTimerSet returns if any timer/ticker has ever been set on mock.
func (m *Mock) IsTimerSet() bool {
return atomic.LoadInt32(&m.flag) != 0
}

// Set sets the current time of the mock clock to a specific one.
// This should only be called from a single goroutine at a time.
func (m *Mock) Set(t time.Time) {
Expand Down Expand Up @@ -82,6 +89,8 @@ func (m *Mock) Sleep(d time.Duration) {

// Timer returns a timer that will fire when the mock current time becomes > m.Now().Add(d)
func (m *Mock) Timer(d time.Duration) clock.Timer {
defer atomic.StoreInt32(&m.flag, 1)

m.lock.RLock()
defer m.lock.RUnlock()

Expand All @@ -104,6 +113,8 @@ func (m *Mock) AfterFunc(d time.Duration, f func()) clock.Timer {
}

func (m *Mock) afterFunc(d time.Duration, f func()) clock.Timer {
defer atomic.StoreInt32(&m.flag, 1)

return &mockTimer{
Timer: m.mock.AfterFunc(safeDuration(d), func() {
go f()
Expand All @@ -113,6 +124,8 @@ func (m *Mock) afterFunc(d time.Duration, f func()) clock.Timer {

// Ticker returns a ticker that will fire every time when the mock current time becomes > mock previous time + d
func (m *Mock) Ticker(d time.Duration) clock.Ticker {
defer atomic.StoreInt32(&m.flag, 1)

m.lock.RLock()
defer m.lock.RUnlock()

Expand Down

0 comments on commit 06d9cc9

Please sign in to comment.