diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go index 430805be..b21c693d 100644 --- a/network/connecteventmanager.go +++ b/network/connecteventmanager.go @@ -80,27 +80,32 @@ func (c *connectEventManager) setState(p peer.ID, newState state) { } } +// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the +// connect event manager has been stopped. +func (c *connectEventManager) waitChange() bool { + for !c.stop && len(c.changeQueue) == 0 { + c.cond.Wait() + } + return !c.stop +} + func (c *connectEventManager) worker() { c.lk.Lock() defer c.lk.Unlock() defer close(c.done) - for { - for !c.stop && len(c.changeQueue) == 0 { - c.cond.Wait() - } - - if c.stop { - return - } - + for c.waitChange() { pid := c.changeQueue[0] - c.changeQueue[0] = peer.ID("") + c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that) c.changeQueue = c.changeQueue[1:] state, ok := c.peers[pid] - // If we've disconnected and forgotten, continue. We shouldn't reach this? + // If we've disconnected and forgotten, continue. if !ok { + // This shouldn't be possible because _this_ thread is responsible for + // removing peers from this map, and we shouldn't get duplicate entries in + // the change queue. + log.Error("a change was enqueued for a peer we're not tracking") continue } diff --git a/network/connecteventmanager_test.go b/network/connecteventmanager_test.go index 74211bb6..33d673e1 100644 --- a/network/connecteventmanager_test.go +++ b/network/connecteventmanager_test.go @@ -61,6 +61,10 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) { connected: true, }) + // Flush the event queue. + wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) + // Block up the event loop. connListener.Lock() cem.Connected(peers[1]) @@ -91,10 +95,12 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) { // Don't mark as connected when we receive a message (could have been delayed). cem.OnMessage(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) // Handle connected event. cem.Connected(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) expectedEvents = append(expectedEvents, mockConnEvent{ peer: p, @@ -104,6 +110,7 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) { // Becomes unresponsive. cem.MarkUnresponsive(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) expectedEvents = append(expectedEvents, mockConnEvent{ peer: p, @@ -113,14 +120,17 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) { // Don't expect the peer to be come connected. cem.Connected(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) // No duplicate event. cem.MarkUnresponsive(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) // Becomes responsive. cem.OnMessage(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) expectedEvents = append(expectedEvents, mockConnEvent{ peer: p, @@ -143,6 +153,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { // Handle connected event. cem.Connected(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) expectedEvents = append(expectedEvents, mockConnEvent{ peer: p, @@ -152,6 +163,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { // Becomes unresponsive. cem.MarkUnresponsive(p) wait(t, cem) + require.Equal(t, expectedEvents, connListener.events) expectedEvents = append(expectedEvents, mockConnEvent{ peer: p, diff --git a/network/ipfs_impl_test.go b/network/ipfs_impl_test.go index 180adae3..fc3585e0 100644 --- a/network/ipfs_impl_test.go +++ b/network/ipfs_impl_test.go @@ -285,7 +285,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore()) bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2) bsnet2.Start(r2) - t.Cleanup(bsnet1.Stop) + t.Cleanup(bsnet2.Stop) if r2.listener != nil { eh2.Network().Notify(r2.listener) } diff --git a/testnet/network_test.go b/testnet/network_test.go index a0e3d0f6..fbd1fa41 100644 --- a/testnet/network_test.go +++ b/testnet/network_test.go @@ -60,7 +60,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { t.Fatal("Message not received from the responder") } })) - t.Cleanup(responder.Stop) + t.Cleanup(waiter.Stop) messageSentAsync := bsmsg.New(true) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))