From 0cbfff776a4960d3720ac05bb854ce2a9bdcba20 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 22 Jan 2019 08:39:53 -0800 Subject: [PATCH] contexts: make sure to abort when a context is canceled Also, buffer single-use channels we may walk away from. This was showing up (rarely) in a go-ipfs test. --- peermanager/peermanager.go | 15 ++++- sessionpeermanager/sessionpeermanager.go | 9 ++- .../sessionrequestsplitter.go | 2 +- wantmanager/wantmanager.go | 60 +++++++++++++++---- workers.go | 8 ++- 5 files changed, 74 insertions(+), 20 deletions(-) diff --git a/peermanager/peermanager.go b/peermanager/peermanager.go index 30145cc5..fed1b3f7 100644 --- a/peermanager/peermanager.go +++ b/peermanager/peermanager.go @@ -59,9 +59,18 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { // ConnectedPeers returns a list of peers this PeerManager is managing. func (pm *PeerManager) ConnectedPeers() []peer.ID { - resp := make(chan []peer.ID) - pm.peerMessages <- &getPeersMessage{resp} - return <-resp + resp := make(chan []peer.ID, 1) + select { + case pm.peerMessages <- &getPeersMessage{resp}: + case <-pm.ctx.Done(): + return nil + } + select { + case peers := <-resp: + return peers + case <-pm.ctx.Done(): + return nil + } } // Connected is called to add a new peer to the pool, and send it an initial set diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index 2e733832..225f1901 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -82,7 +82,7 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) { func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID { // right now this just returns all peers, but soon we might return peers // ordered by optimization, or only a subset - resp := make(chan []peer.ID) + resp := make(chan []peer.ID, 1) select { case spm.peerMessages <- &peerReqMessage{resp}: case <-spm.ctx.Done(): @@ -108,11 +108,16 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { // - share peers between sessions based on interest set for p := range spm.network.FindProvidersAsync(ctx, k, 10) { go func(p peer.ID) { + // TODO: Also use context from spm. err := spm.network.ConnectTo(ctx, p) if err != nil { log.Debugf("failed to connect to provider %s: %s", p, err) } - spm.peerMessages <- &peerFoundMessage{p} + select { + case spm.peerMessages <- &peerFoundMessage{p}: + case <-ctx.Done(): + case <-spm.ctx.Done(): + } }(p) } }(c) diff --git a/sessionrequestsplitter/sessionrequestsplitter.go b/sessionrequestsplitter/sessionrequestsplitter.go index 32dcf1fc..1305b73b 100644 --- a/sessionrequestsplitter/sessionrequestsplitter.go +++ b/sessionrequestsplitter/sessionrequestsplitter.go @@ -51,7 +51,7 @@ func New(ctx context.Context) *SessionRequestSplitter { // SplitRequest splits a request for the given cids one or more times among the // given peers. func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest { - resp := make(chan []*PartialRequest) + resp := make(chan []*PartialRequest, 1) select { case srs.messages <- &splitRequestMessage{peers, ks, resp}: diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index bf14ea71..3e5a6c9a 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -83,30 +83,66 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe // IsWanted returns whether a CID is currently wanted. func (wm *WantManager) IsWanted(c cid.Cid) bool { - resp := make(chan bool) - wm.wantMessages <- &isWantedMessage{c, resp} - return <-resp + resp := make(chan bool, 1) + select { + case wm.wantMessages <- &isWantedMessage{c, resp}: + case <-wm.ctx.Done(): + return false + } + select { + case wanted := <-resp: + return wanted + case <-wm.ctx.Done(): + return false + } } // CurrentWants returns the list of current wants. func (wm *WantManager) CurrentWants() []*wantlist.Entry { - resp := make(chan []*wantlist.Entry) - wm.wantMessages <- ¤tWantsMessage{resp} - return <-resp + resp := make(chan []*wantlist.Entry, 1) + select { + case wm.wantMessages <- ¤tWantsMessage{resp}: + case <-wm.ctx.Done(): + return nil + } + select { + case wantlist := <-resp: + return wantlist + case <-wm.ctx.Done(): + return nil + } } // CurrentBroadcastWants returns the current list of wants that are broadcasts. func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { - resp := make(chan []*wantlist.Entry) - wm.wantMessages <- ¤tBroadcastWantsMessage{resp} - return <-resp + resp := make(chan []*wantlist.Entry, 1) + select { + case wm.wantMessages <- ¤tBroadcastWantsMessage{resp}: + case <-wm.ctx.Done(): + return nil + } + select { + case wl := <-resp: + return wl + case <-wm.ctx.Done(): + return nil + } } // WantCount returns the total count of wants. func (wm *WantManager) WantCount() int { - resp := make(chan int) - wm.wantMessages <- &wantCountMessage{resp} - return <-resp + resp := make(chan int, 1) + select { + case wm.wantMessages <- &wantCountMessage{resp}: + case <-wm.ctx.Done(): + return 0 + } + select { + case count := <-resp: + return count + case <-wm.ctx.Done(): + return 0 + } } // Startup starts processing for the WantManager. diff --git a/workers.go b/workers.go index 32f9da81..688a1d99 100644 --- a/workers.go +++ b/workers.go @@ -217,11 +217,15 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { // TODO: come up with a better strategy for determining when to search // for new providers for blocks. i := rand.Intn(len(entries)) - bs.findKeys <- &blockRequest{ + select { + case bs.findKeys <- &blockRequest{ Cid: entries[i].Cid, Ctx: ctx, + }: + case <-ctx.Done(): + return } - case <-parent.Done(): + case <-ctx.Done(): return } }