Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Jun 2, 2022
1 parent 051eef2 commit 1ae811f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
27 changes: 16 additions & 11 deletions network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,32 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {
}
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)

for {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}

if c.stop {
return
}

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("")
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue. We shouldn't reach this?
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
continue
}

Expand Down
12 changes: 12 additions & 0 deletions network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) {
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
Expand Down Expand Up @@ -91,10 +95,12 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
// Don't mark as connected when we receive a message (could have been delayed).
cem.OnMessage(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Handle connected event.
cem.Connected(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
Expand All @@ -104,6 +110,7 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
// Becomes unresponsive.
cem.MarkUnresponsive(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
Expand All @@ -113,14 +120,17 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
// Don't expect the peer to be come connected.
cem.Connected(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// No duplicate event.
cem.MarkUnresponsive(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Becomes responsive.
cem.OnMessage(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
Expand All @@ -143,6 +153,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
// Handle connected event.
cem.Connected(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
Expand All @@ -152,6 +163,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
// Becomes unresponsive.
cem.MarkUnresponsive(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
Expand Down
2 changes: 1 addition & 1 deletion network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2.Start(r2)
t.Cleanup(bsnet1.Stop)
t.Cleanup(bsnet2.Stop)
if r2.listener != nil {
eh2.Network().Notify(r2.listener)
}
Expand Down
2 changes: 1 addition & 1 deletion testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")
}
}))
t.Cleanup(responder.Stop)
t.Cleanup(waiter.Stop)

messageSentAsync := bsmsg.New(true)
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Expand Down

0 comments on commit 1ae811f

Please sign in to comment.