From 2800ab9871adb9c83abe4d0a18239179cb44925e Mon Sep 17 00:00:00 2001 From: YACOVM Date: Wed, 2 Nov 2016 11:37:55 +0200 Subject: [PATCH] Gossip tests tweaks This commit: 1) Shortens time of the discovery tests from 20 seconds to 5 seconds By waiting until assertions hold (or a timeout expires) instead of sleeping and then checking the assertions. 2) Fixes a bug in tests that only manifests on Mac: I am checking At some point that goroutines are dead, but the detection logic didn't fit Mac runtime because they had different order of methods in go tests stack traces. I simply do an exhaustive search now of all stack trace of a goroutine instead of checking a specific index. Change-Id: I0bfed083bedcf6dce6247c77c2885ea436717c82 Signed-off-by: Yacov Manevich --- gossip/comm/comm_impl.go | 3 + gossip/comm/conn.go | 4 + gossip/discovery/discovery_test.go | 107 +++++++++++----- gossip/gossip/gossip_test.go | 198 ++++++++++++++++------------- 4 files changed, 188 insertions(+), 124 deletions(-) diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index dce254694aa..01ca07a042a 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -458,6 +458,9 @@ func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) { } func (c *commImpl) disconnect(pkiID PKIidType) { + if c.isStopping() { + return + } c.deadEndpoints <- pkiID c.connStore.closeByPKIid(pkiID) } diff --git a/gossip/comm/conn.go b/gossip/comm/conn.go index b37b2c7af7a..a3b27ac980b 100644 --- a/gossip/comm/conn.go +++ b/gossip/comm/conn.go @@ -89,6 +89,10 @@ func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) destinationLock.Unlock() + if cs.isClosing { + return nil, fmt.Errorf("ConnStore is closing") + } + cs.Lock() delete(cs.destinationLocks, string(pkiID)) defer cs.Unlock() diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index 8ef1ac6ecd2..522fa06453b 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc" ) +var timeout = time.Second * time.Duration(15) + type dummyCommModule struct { id string presumeDead chan PKIidType @@ -257,35 +259,47 @@ func TestUpdate(t *testing.T) { instances = append(instances, inst) } - time.Sleep(time.Duration(5) * time.Second) - assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership())) + fullMembership := func() bool { + return nodeNum-1 == len(instances[nodeNum-1].GetMembership()) + } + + waitUntilOrFail(t, fullMembership) instances[0].UpdateMetadata([]byte("bla bla")) instances[nodeNum-1].UpdateEndpoint("localhost:5511") - time.Sleep(time.Duration(5) * time.Second) - for _, member := range instances[nodeNum-1].GetMembership() { - if string(member.PKIid) == instances[0].comm.id { - assert.Equal(t, "bla bla", string(member.Metadata)) + checkMembership := func() bool { + for _, member := range instances[nodeNum-1].GetMembership() { + if string(member.PKIid) == instances[0].comm.id { + if "bla bla" != string(member.Metadata) { + return false + } + } } - } - for _, member := range instances[0].GetMembership() { - if string(member.PKIid) == instances[nodeNum-1].comm.id { - assert.Equal(t, "localhost:5511", string(member.Endpoint)) + for _, member := range instances[0].GetMembership() { + if string(member.PKIid) == instances[nodeNum-1].comm.id { + if "localhost:5511" != string(member.Endpoint) { + return false + } + } } + return true } + + waitUntilOrFail(t, checkMembership) + stopAction := &sync.WaitGroup{} - for i, inst := range instances { - fmt.Println("Stopping instance ", i) + for _, inst := range instances { stopAction.Add(1) go func(inst *gossipInstance) { defer stopAction.Done() inst.Stop() }(inst) } - stopAction.Wait() + + waitUntilOrFailBlocking(t, stopAction.Wait) } func TestExpiration(t *testing.T) { @@ -305,30 +319,35 @@ func TestExpiration(t *testing.T) { instances = append(instances, inst) } - time.Sleep(time.Duration(4) * time.Second) - assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership())) + fullMembership := func() bool { + return nodeNum-1 == len(instances[nodeNum-1].GetMembership()) + } + + waitUntilOrFail(t, fullMembership) - instances[nodeNum-1].Stop() - instances[nodeNum-2].Stop() + waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop) + waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop) time.Sleep(time.Duration(2) * time.Second) - assert.Equal(t, nodeNum-3, len(instances[0].GetMembership())) + membershipReduced := func() bool { + return nodeNum-3 == len(instances[0].GetMembership()) + } + + waitUntilOrFail(t, membershipReduced) - //fmt.Println("Stopping members") stopAction := &sync.WaitGroup{} for i, inst := range instances { if i+2 == nodeNum { break } - //fmt.Println("Stopping instance", inst.DiscoveryService.Self().Endpoint) stopAction.Add(1) go func(inst *gossipInstance) { defer stopAction.Done() inst.Stop() - //fmt.Println("Stopped instance", inst.DiscoveryService.Self().Endpoint) }(inst) } - stopAction.Wait() + + waitUntilOrFailBlocking(t, stopAction.Wait) } func TestGetFullMembership(t *testing.T) { @@ -348,8 +367,10 @@ func TestGetFullMembership(t *testing.T) { instances = append(instances, inst) } - time.Sleep(time.Duration(5) * time.Second) - assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership())) + fullMembership := func() bool { + return nodeNum - 1 == len(instances[nodeNum-1].GetMembership()) + } + waitUntilOrFail(t, fullMembership) stopAction := &sync.WaitGroup{} for _, inst := range instances { @@ -359,24 +380,40 @@ func TestGetFullMembership(t *testing.T) { inst.Stop() }(inst) } - stopAction.Wait() + + waitUntilOrFailBlocking(t, stopAction.Wait) } func TestGossipDiscoveryStopping(t *testing.T) { inst := createDiscoveryInstance(9611, "d1", []string{bootPeer(9611)}) + time.Sleep(time.Second) + waitUntilOrFailBlocking(t, inst.Stop) - diedChan := make(chan struct{}) - go func(inst *gossipInstance) { - inst.Stop() - diedChan <- struct{}{} - }(inst) +} - timer := time.Tick(time.Duration(500) * time.Millisecond) +func waitUntilOrFail(t *testing.T, pred func() bool) { + start := time.Now() + limit := start.UnixNano() + timeout.Nanoseconds() + for time.Now().UnixNano() < limit { + if pred() { + return + } + time.Sleep(timeout / 10) + } + assert.Fail(t, "Timeout expired!") +} +func waitUntilOrFailBlocking(t *testing.T, f func()) { + successChan := make(chan struct{}, 1) + go func() { + f() + successChan <- struct{}{} + }() select { - case <-timer: - t.Fatal("Didn't stop within a timely manner") - t.Fail() - case <-diedChan: + case <-time.NewTimer(timeout).C: + break + case <-successChan: + return } + assert.Fail(t, "Timeout expired!") } diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index deb965806b8..a78ec573e61 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -37,6 +37,9 @@ import ( "github.com/stretchr/testify/assert" ) +var individualTimeout = time.Second * time.Duration(60) +var perTestTimeout = time.Second * time.Duration(180) + func init() { aliveTimeInterval := time.Duration(1000) * time.Millisecond discovery.SetAliveTimeInternal(aliveTimeInterval) @@ -84,7 +87,7 @@ func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { func bootPeers(ids ...int) []string { peers := []string{} for _, id := range ids { - peers = append(peers, fmt.Sprintf("localhost:%d", (id+portPrefix))) + peers = append(peers, fmt.Sprintf("localhost:%d", (id + portPrefix))) } return peers } @@ -154,7 +157,7 @@ func TestPull(t *testing.T) { }() stopped := int32(0) - go waitForTestCompletion(&stopped, time.Duration(40)*time.Second, t) + go waitForTestCompletion(&stopped, t) n := 5 msgsCount2Send := 10 @@ -165,7 +168,7 @@ func TestPull(t *testing.T) { wg.Add(1) go func(i int) { pI := newGossipInstanceWithOnlyPull(i, 100, 0) - peers[i-1] = pI + peers[i - 1] = pI wg.Done() }(i) } @@ -173,7 +176,7 @@ func TestPull(t *testing.T) { knowAll := func() bool { for i := 1; i <= n; i++ { - neighborCount := len(peers[i-1].GetPeers()) + neighborCount := len(peers[i - 1].GetPeers()) if n != neighborCount { return false } @@ -181,7 +184,7 @@ func TestPull(t *testing.T) { return true } - waitUntilOrFail(t, knowAll, time.Duration(8) * time.Second) + waitUntilOrFail(t, knowAll) receivedMessages := make([]int, n) wg = sync.WaitGroup{} @@ -194,7 +197,7 @@ func TestPull(t *testing.T) { <-ch receivedMessages[index]++ } - }(i-1, peers[i-1].Accept(acceptData)) + }(i - 1, peers[i - 1].Accept(acceptData)) }(i) } @@ -203,8 +206,7 @@ func TestPull(t *testing.T) { } time.Sleep(time.Duration(3) * time.Second) - waitUntilOrFailBlocking(t, wg.Wait, time.Duration(10) * time.Second) - + waitUntilOrFailBlocking(t, wg.Wait) receivedAll := func() bool { for i := 0; i < n; i++ { @@ -214,13 +216,13 @@ func TestPull(t *testing.T) { } return true } - waitUntilOrFail(t, receivedAll, time.Duration(20) * time.Second) + waitUntilOrFail(t, receivedAll) stop := func() { stopPeers(append(peers, boot)) } - waitUntilOrFailBlocking(t, stop, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, stop) fmt.Println("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) @@ -236,7 +238,7 @@ func TestMembership(t *testing.T) { defer testLock.Unlock() stopped := int32(0) - go waitForTestCompletion(&stopped, time.Duration(40)*time.Second, t) + go waitForTestCompletion(&stopped, t) n := 20 var lastPeer = fmt.Sprintf("localhost:%d", (n + portPrefix)) @@ -248,16 +250,16 @@ func TestMembership(t *testing.T) { wg.Add(1) go func(i int) { pI := newGossipInstance(i, 100, 0) - peers[i-1] = pI + peers[i - 1] = pI wg.Done() }(i) } - waitUntilOrFailBlocking(t, wg.Wait, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, wg.Wait) seeAllNeighbors := func() bool { for i := 1; i <= n; i++ { - neighborCount := len(peers[i-1].GetPeers()) + neighborCount := len(peers[i - 1].GetPeers()) if neighborCount != n { return false } @@ -265,33 +267,32 @@ func TestMembership(t *testing.T) { return true } - waitUntilOrFail(t, seeAllNeighbors, time.Duration(30) * time.Second) - + waitUntilOrFail(t, seeAllNeighbors) fmt.Println("Updating metadata...") // Change metadata in last node - peers[len(peers)-1].UpdateMetadata([]byte("bla bla")) + peers[len(peers) - 1].UpdateMetadata([]byte("bla bla")) metaDataUpdated := func() bool { - if "bla bla"!= string(metadataOfPeer(boot.GetPeers(), lastPeer)) { + if "bla bla" != string(metadataOfPeer(boot.GetPeers(), lastPeer)) { return false } - for i := 0; i < n-1; i++ { - if "bla bla"!= string(metadataOfPeer(peers[i].GetPeers(), lastPeer)) { + for i := 0; i < n - 1; i++ { + if "bla bla" != string(metadataOfPeer(peers[i].GetPeers(), lastPeer)) { return false } } return true } - waitUntilOrFail(t, metaDataUpdated, time.Duration(30) * time.Second) + waitUntilOrFail(t, metaDataUpdated) stop := func() { stopPeers(append(peers, boot)) } - waitUntilOrFailBlocking(t, stop, time.Duration(30) * time.Second) + waitUntilOrFailBlocking(t, stop) fmt.Println("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) @@ -308,7 +309,7 @@ func TestDissemination(t *testing.T) { defer testLock.Unlock() stopped := int32(0) - go waitForTestCompletion(&stopped, time.Duration(60)*time.Second, t) + go waitForTestCompletion(&stopped, t) n := 20 msgsCount2Send := 10 @@ -318,27 +319,25 @@ func TestDissemination(t *testing.T) { receivedMessages := make([]int, n) wg := sync.WaitGroup{} for i := 1; i <= n; i++ { - go func(i int) { - pI := newGossipInstance(i, 100, 0) - peers[i-1] = pI - go func(index int, ch <-chan *proto.GossipMessage) { - wg.Add(1) - defer wg.Done() - for j := 0; j < msgsCount2Send; j++ { - <-ch - receivedMessages[index]++ - } - }(i-1, pI.Accept(acceptData)) - }(i) + pI := newGossipInstance(i, 100, 0) + peers[i - 1] = pI + go func(index int, ch <-chan *proto.GossipMessage) { + wg.Add(1) + defer wg.Done() + for j := 0; j < msgsCount2Send; j++ { + <-ch + receivedMessages[index]++ + } + }(i - 1, pI.Accept(acceptData)) } - time.Sleep(time.Duration(8) * time.Second) + time.Sleep(time.Duration(10) * time.Second) for i := 1; i <= msgsCount2Send; i++ { boot.Gossip(createDataMsg(uint64(i), []byte{}, "")) } - waitUntilOrFailBlocking(t, wg.Wait, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, wg.Wait) for i := 0; i < n; i++ { assert.Equal(t, msgsCount2Send, receivedMessages[i]) @@ -349,7 +348,7 @@ func TestDissemination(t *testing.T) { stopPeers(append(peers, boot)) } - waitUntilOrFailBlocking(t, stop, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, stop) fmt.Println("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) @@ -373,7 +372,7 @@ func TestMembershipConvergence(t *testing.T) { t1 := time.Now() stopped := int32(0) - go waitForTestCompletion(&stopped, time.Duration(60)*time.Second, t) + go waitForTestCompletion(&stopped, t) boot0 := newGossipInstance(0, 100) boot1 := newGossipInstance(1, 100) boot2 := newGossipInstance(2, 100) @@ -385,7 +384,7 @@ func TestMembershipConvergence(t *testing.T) { // 1: {4, 7, 10, 13} // 2: {5, 8, 11, 14} for i := 3; i < 15; i++ { - pI := newGossipInstance(i, 100, i%3) + pI := newGossipInstance(i, 100, i % 3) peers = append(peers, pI) } time.Sleep(time.Duration(3) * time.Second) @@ -399,21 +398,20 @@ func TestMembershipConvergence(t *testing.T) { fullKnowledge := func() bool { for i := 0; i < 15; i++ { - if 15 != len(peers[i].GetPeers()) { + if 15 != len(peers[i].GetPeers()) { return false } - if "Connector" != string(metadataOfPeer(peers[i].GetPeers(), "localhost:5625")) { + if "Connector" != string(metadataOfPeer(peers[i].GetPeers(), "localhost:5625")) { return false } } return true } - - waitUntilOrFail(t, fullKnowledge, time.Duration(20) * time.Second) + waitUntilOrFail(t, fullKnowledge) fmt.Println("Stopping connector...") - waitUntilOrFailBlocking(t, connectorPeer.Stop, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, connectorPeer.Stop) fmt.Println("Stopped") time.Sleep(time.Duration(15) * time.Second) @@ -426,7 +424,7 @@ func TestMembershipConvergence(t *testing.T) { return true } - waitUntilOrFail(t, ensureForget, time.Duration(20) * time.Second) + waitUntilOrFail(t, ensureForget) connectorPeer = newGossipInstance(15, 100) connectorPeer.UpdateMetadata([]byte("Connector")) @@ -444,16 +442,16 @@ func TestMembershipConvergence(t *testing.T) { return true } - waitUntilOrFail(t, ensureResync, time.Duration(20) * time.Second) + waitUntilOrFail(t, ensureResync) - waitUntilOrFailBlocking(t, connectorPeer.Stop, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, connectorPeer.Stop) fmt.Println("Stopping peers") stop := func() { stopPeers(peers) } - waitUntilOrFailBlocking(t, stop, time.Duration(20) * time.Second) + waitUntilOrFailBlocking(t, stop) atomic.StoreInt32(&stopped, int32(1)) fmt.Println("Took", time.Since(t1)) ensureGoroutineExit(t) @@ -474,44 +472,54 @@ func createDataMsg(seqnum uint64, data []byte, hash string) *proto.GossipMessage } } -func ensureGoroutineExit(t *testing.T) { - time.Sleep(time.Duration(5) * time.Second) - runTests := func(g goroutine) bool { - return strings.Index(g.stack[0], "testing.RunTests") != -1 - } +var runTests = func(g goroutine) bool { + return searchInStackTrace("testing.RunTests", g.stack) +} - waitForTestCompl := func(g goroutine) bool { - for _, ste := range g.stack { - if strings.Index(ste, "waitForTestCompletion") != -1 { - return true - } - } - return false - } +var waitForTestCompl = func(g goroutine) bool { + return searchInStackTrace("waitForTestCompletion", g.stack) +} - goExit := func(g goroutine) bool { - return strings.Index(g.stack[0], "runtime.goexit") != -1 - } +var goExit = func(g goroutine) bool { + return searchInStackTrace("runtime.goexit", g.stack) +} - clientConn := func(g goroutine) bool { - return strings.Index(g.stack[0], "resetTransport") != -1 - } +var clientConn = func(g goroutine) bool { + return searchInStackTrace("resetTransport", g.stack) +} - testing := func(g goroutine) bool { - return strings.Index(g.stack[len(g.stack)-1], "testing.go") != -1 - } +var testingg = func(g goroutine) bool { + return strings.Index(g.stack[len(g.stack) - 1], "testing.go") != -1 +} + +func shouldNotBeRunningAtEnd(gr goroutine) bool { + return !runTests(gr) && !goExit(gr) && ! testingg(gr) && !clientConn(gr) && !waitForTestCompl(gr) +} + +func ensureGoroutineExit(t *testing.T) { + for i := 0; i <= 20; i++ { + time.Sleep(time.Second) + allEnded := true + for _, gr := range getGoRoutines() { + if shouldNotBeRunningAtEnd(gr) { + allEnded = false + continue + } - for _, gr := range getGoRoutines() { - if !runTests(gr) && !goExit(gr) && !testing(gr) && !clientConn(gr) && !waitForTestCompl(gr) { - assert.Fail(t, "Goroutine(s) haven't ended: %s", fmt.Sprintf("%v", gr.stack)) - time.Sleep(time.Duration(500) * time.Millisecond) // sleep to sync testing output and printing - for _, gr2 := range getGoRoutines() { - for _, ste := range gr2.stack { - fmt.Println(ste) + if shouldNotBeRunningAtEnd(gr) && i == 20 { + assert.Fail(t, "Goroutine(s) haven't ended:", fmt.Sprintf("%v", gr.stack)) + for _, gr2 := range getGoRoutines() { + for _, ste := range gr2.stack { + t.Log(ste) + } + t.Log() } - fmt.Println() + break } - break + } + + if allEnded { + return } } } @@ -525,8 +533,8 @@ func metadataOfPeer(members []discovery.NetworkMember, endpoint string) []byte { return nil } -func waitForTestCompletion(stopFlag *int32, waitTime time.Duration, t *testing.T) { - time.Sleep(waitTime) +func waitForTestCompletion(stopFlag *int32, t *testing.T) { + time.Sleep(perTestTimeout) if atomic.LoadInt32(stopFlag) == int32(1) { return } @@ -548,7 +556,7 @@ func stopPeers(peers []Gossip) { } func getGoroutineRawText() string { - buf := make([]byte, 1<<16) + buf := make([]byte, 1 << 16) runtime.Stack(buf, true) return string(buf) } @@ -581,29 +589,41 @@ type goroutine struct { stack []string } -func waitUntilOrFail(t *testing.T, pred func() bool, timeout time.Duration) { +func waitUntilOrFail(t *testing.T, pred func() bool) { start := time.Now() - limit := start.UnixNano() + timeout.Nanoseconds() + limit := start.UnixNano() + individualTimeout.Nanoseconds() for time.Now().UnixNano() < limit { if pred() { return } - time.Sleep(timeout / 10) + time.Sleep(individualTimeout / 60) } + util.PrintStackTrace() assert.Fail(t, "Timeout expired!") } -func waitUntilOrFailBlocking(t *testing.T, f func(), timeout time.Duration) { +func waitUntilOrFailBlocking(t *testing.T, f func()) { successChan := make(chan struct{}, 1) go func() { f() successChan <- struct{}{} }() select { - case <- time.NewTimer(timeout).C: + case <-time.NewTimer(individualTimeout).C: break - case <- successChan: + case <-successChan: return } + util.PrintStackTrace() assert.Fail(t, "Timeout expired!") +} + + +func searchInStackTrace(searchTerm string, stack []string) bool { + for _, ste := range stack { + if strings.Index(ste, searchTerm) != -1 { + return true + } + } + return false } \ No newline at end of file