Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

contexts: make sure to abort when a context is canceled #58

Merged
merged 1 commit into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,18 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID)
pm.peerMessages <- &getPeersMessage{resp}
return <-resp
resp := make(chan []peer.ID, 1)
select {
case pm.peerMessages <- &getPeersMessage{resp}:
case <-pm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-pm.ctx.Done():
return nil
}
}

// Connected is called to add a new peer to the pool, and send it an initial set
Expand Down
9 changes: 7 additions & 2 deletions sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
resp := make(chan []peer.ID, 1)
select {
case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done():
Expand All @@ -108,11 +108,16 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
go func(p peer.ID) {
// TODO: Also use context from spm.
err := spm.network.ConnectTo(ctx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
}
spm.peerMessages <- &peerFoundMessage{p}
select {
case spm.peerMessages <- &peerFoundMessage{p}:
case <-ctx.Done():
case <-spm.ctx.Done():
}
}(p)
}
}(c)
Expand Down
2 changes: 1 addition & 1 deletion sessionrequestsplitter/sessionrequestsplitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func New(ctx context.Context) *SessionRequestSplitter {
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest)
resp := make(chan []*PartialRequest, 1)

select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
Expand Down
60 changes: 48 additions & 12 deletions wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,66 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe

// IsWanted returns whether a CID is currently wanted.
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool)
wm.wantMessages <- &isWantedMessage{c, resp}
return <-resp
resp := make(chan bool, 1)
select {
case wm.wantMessages <- &isWantedMessage{c, resp}:
case <-wm.ctx.Done():
return false
}
select {
case wanted := <-resp:
return wanted
case <-wm.ctx.Done():
return false
}
}

// CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentWantsMessage{resp}
return <-resp
resp := make(chan []*wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wantlist := <-resp:
return wantlist
case <-wm.ctx.Done():
return nil
}
}

// CurrentBroadcastWants returns the current list of wants that are broadcasts.
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentBroadcastWantsMessage{resp}
return <-resp
resp := make(chan []*wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentBroadcastWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wl := <-resp:
return wl
case <-wm.ctx.Done():
return nil
}
}

// WantCount returns the total count of wants.
func (wm *WantManager) WantCount() int {
resp := make(chan int)
wm.wantMessages <- &wantCountMessage{resp}
return <-resp
resp := make(chan int, 1)
select {
case wm.wantMessages <- &wantCountMessage{resp}:
case <-wm.ctx.Done():
return 0
}
select {
case count := <-resp:
return count
case <-wm.ctx.Done():
return 0
}
}

// Startup starts processing for the WantManager.
Expand Down
8 changes: 6 additions & 2 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
i := rand.Intn(len(entries))
bs.findKeys <- &blockRequest{
select {
case bs.findKeys <- &blockRequest{
Cid: entries[i].Cid,
Ctx: ctx,
}:
case <-ctx.Done():
return
}
case <-parent.Done():
case <-ctx.Done():
return
}
}
Expand Down