Skip to content

Commit

Permalink
fix leak in blockresponsemanager
Browse files Browse the repository at this point in the history
SessionManager.ReceiveFrom added HAVEs/DONT_HAVEs even if no sessions were intersted in those, so the following could happen:
Session requests BLOCK from one peer and HAVE from another. BLOCK is received before HAVE. Session cleans up its interest on BLOCK,
but the following HAVE is still added to BlockResponseManager leaving it there forever.

The solution is to add a CID to BlockResponseManager only if there a session interested in it.
  • Loading branch information
Wondertan committed Nov 18, 2024
1 parent b0a7e7e commit 70a6503
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
7 changes: 0 additions & 7 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,6 @@ func (s *Session) Shutdown() {

// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// The SessionManager tells each Session about all keys that it may be
// interested in. Here the Session filters the keys to the ones that this
// particular Session is interested in.
interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
ks = interestedRes[0]
haves = interestedRes[1]
dontHaves = interestedRes[2]
s.logReceiveFrom(from, ks, haves, dontHaves)

// Inform the session want sender that a message has been received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,36 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b

// When the SessionManager receives a message it calls InterestedSessions() to
// find out which sessions are interested in the message.
func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 {
func (sim *SessionInterestManager) InterestedSessions(sets ...[]cid.Cid) map[uint64][][]cid.Cid {
sim.lk.RLock()
defer sim.lk.RUnlock()

ks := make([]cid.Cid, 0, len(blks)+len(haves)+len(dontHaves))
ks = append(ks, blks...)
ks = append(ks, haves...)
ks = append(ks, dontHaves...)

// Create a set of sessions that are interested in the keys
sesSet := make(map[uint64]struct{})
for _, c := range ks {
for s := range sim.wants[c] {
sesSet[s] = struct{}{}
sesSet := make(map[uint64][][]cid.Cid)
for i, set := range sets {
for _, c := range set {
sessions, ok := sim.wants[c]
if !ok {
continue
}

for ses, ok := range sessions {
if !ok {
continue
}

if _, ok := sesSet[ses]; !ok {
sessionSets := make([][]cid.Cid, len(sets))
for i, set := range sets {
sets[i] = make([]cid.Cid, 0, len(set))

This comment has been minimized.

Copy link
@gammazero

gammazero Jan 16, 2025

Contributor

I do not understand what this line does:

for i, set := range sets {
	sets[i] = make([]cid.Cid, 0, len(set))
}

So, it assigns an empty slice of CIDs (with non-zero capacity) to each item of sets. Yet the outer loop at line 183 is still iterating sets?

}
sesSet[ses] = sessionSets
}

sesSet[ses][i] = append(sesSet[ses][i], c)
}
}
}

// Convert the set into a list
ses := make([]uint64, 0, len(sesSet))
for s := range sesSet {
ses = append(ses, s)
}
return ses
return sesSet
}
12 changes: 8 additions & 4 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,8 @@ func (sm *SessionManager) GetNextSessionID() uint64 {

// ReceiveFrom is called when a new message is received
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Record block presence for HAVE / DONT_HAVE
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)

// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
for id, keys := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
sm.sessLk.Lock()
if sm.sessions == nil { // check if SessionManager was shutdown
sm.sessLk.Unlock()
Expand All @@ -168,6 +165,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
sm.sessLk.Unlock()

if ok {
blks = keys[0]
haves = keys[1]
dontHaves = keys[2]
// Record block presence for HAVE / DONT_HAVE
// must be called before Seession.ReceiveFrom
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)

sess.ReceiveFrom(p, blks, haves, dontHaves)
}
}
Expand Down

1 comment on commit 70a6503

@gammazero
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session cleans up its interest on BLOCK,
but the following HAVE is still added to BlockResponseManager leaving it there forever.

Wouldn't the call to s.sim.FilterSessionInterested in Session.ReceiveFrom handle this.?

Also, upon receiving a block, the session should not remove its interest in a block, only that it wants a block.. according to the comment in SessionInterestManager:

		// Note that once the block is received the session no longer wants
		// the block, but still wants to receive messages from peers who have
		// the block as they may have other blocks the session is interested in.

Please sign in to comment.