Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not send cancel message to peer that sent block #784

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes:
- `gateway` Support for custom DNSLink / DoH resolvers on `localhost` to simplify integration with non-ICANN DNS systems [#645](https://github.com/ipfs/boxo/pull/645)

### Changed
- Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784)

- `gateway` The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781)
- `gateway` The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782)
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs)
pm.pwm.sendCancels(cancelKs, excludePeer)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
49 changes: 46 additions & 3 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids)
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 3 {
t.Fatal("Expected cancel to be sent for want-blocks")
}
}

func (s *sess) ID() uint64 {
return s.id
}
Expand Down Expand Up @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel)
peerManager.SendCancels(ctx, cancel, "")
}
}
}
3 changes: 2 additions & 1 deletion bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) {
if len(cancelKs) == 0 {
return
}
Expand Down Expand Up @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
cancelPeers[p] = struct{}{}
}
}
delete(cancelPeers, excludePeer)
for p := range cancelPeers {
pws, ok := pwm.peerWants[p]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) {

// Cancel 1 want-block and 1 want-have that were sent to p0
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "")
// Should cancel the want-block and want-have
require.Empty(t, pq1.cancels, "Expected no cancels sent to p1")
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled")
Expand All @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) {
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
pwm.sendCancels(allCids)
pwm.sendCancels(allCids, "")
// Should cancel the remaining want-blocks and want-haves for p0
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled")

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids5 := random.Cids(1)
pwm.sendCancels(append(cids5, cids[0]))
pwm.sendCancels(append(cids5, cids[0]), "")

require.Equal(t, 7, g.count, "Expected 7 wants")
require.Equal(t, 3, wbg.count, "Expected 3 want-blocks")
Expand All @@ -332,7 +332,7 @@ func TestStats(t *testing.T) {
require.Zero(t, wbg.count, "Expected 0 want-blocks")

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1])
pwm.sendCancels(cids2[:1], "")
require.Equal(t, 2, g.count, "Expected 2 wants")
require.Zero(t, wbg.count, "Expected 0 want-blocks")
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
require.Equal(t, 4, wbg.count, "Expected 4 want-blocks")

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "")

require.Equal(t, 2, g.count, "Expected 2 wants")
require.Equal(t, 2, wbg.count, "Expected 2 want-blocks")
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type PeerManager interface {
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
SendCancels(context.Context, []cid.Cid, peer.ID)
}

// SessionManager manages all the sessions
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}
func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
}

// Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks)
sm.peerManager.SendCancels(ctx, blks, p)
}

// CancelSessionWants is called when a session cancels wants because a call to
Expand All @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) {
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context may already be Done.
sm.peerManager.SendCancels(sm.ctx, wants)
sm.peerManager.SendCancels(sm.ctx, wants, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
fpm.cancels = append(fpm.cancels, cancels...)
Expand Down
Loading