Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Send CANCELs when session context is cancelled #375

Merged
merged 1 commit into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
Expand Down
17 changes: 11 additions & 6 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
bsctx context.Context // context for bitswap
ctx context.Context // context for session
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
Expand Down Expand Up @@ -124,7 +125,9 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand All @@ -138,6 +141,7 @@ func New(ctx context.Context,
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
pm: pm,
bpm: bpm,
Expand Down Expand Up @@ -393,10 +397,11 @@ func (s *Session) handleShutdown() {
// in anymore
s.bpm.RemoveKeys(cancelKs)

// TODO: If the context is cancelled this won't actually send any CANCELs.
// We should use a longer lived context to send out these CANCELs.
// Send CANCEL to all peers for blocks that no session is interested in anymore
s.pm.SendCancels(s.ctx, cancelKs)
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down
12 changes: 6 additions & 6 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
Expand Down