Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert peer exclude cancel #809

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/client`: Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784)
- `gateway`: The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781)
- `gateway`: The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782)
- upgrade to `go-libp2p-kad-dht` [v0.28.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.28.2)
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs, excludePeer)
pm.pwm.sendCancels(cancelKs)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
49 changes: 3 additions & 46 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
peerManager.SendCancels(ctx, cids)
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -261,49 +261,6 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have, excluding peer1.
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids. Expect cancels for the 1 remaining sid that
// was not previously canceled.
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)
if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 1 {
t.Fatalf("Expected cancel to be sent for 1 want-blocks, got %d", len(collected[peer1].cancels))
}
}

func (s *sess) ID() uint64 {
return s.id
}
Expand Down Expand Up @@ -419,7 +376,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel, "")
peerManager.SendCancels(ctx, cancel)
}
}
}
19 changes: 6 additions & 13 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) {
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return
}
Expand All @@ -257,15 +257,8 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID)

// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
noSend := p == excludePeer

var toCancel []cid.Cid

// If peer is not excluded, then send broadcast cancels to this peer.
if !noSend {
// Start from the broadcast cancels
toCancel = broadcastCancels
}
// Start from the broadcast cancels
toCancel := broadcastCancels

// For each key to be cancelled
for _, c := range cancelKs {
Expand All @@ -278,9 +271,9 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID)
pws.wantBlocks.Remove(c)
pws.wantHaves.Remove(c)

// If peer is not excluded and this a broadcast want is not already
// added it to the peer cancels, then add the cancel.
if !noSend && !pwm.broadcastWants.Has(c) {
// If it's a broadcast want, we've already added it to
// the peer cancels.
if !pwm.broadcastWants.Has(c) {
toCancel = append(toCancel, c)
}
}
Expand Down
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) {

// Cancel 1 want-block and 1 want-have that were sent to p0
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "")
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
// Should cancel the want-block and want-have
require.Empty(t, pq1.cancels, "Expected no cancels sent to p1")
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled")
Expand All @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) {
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
pwm.sendCancels(allCids, "")
pwm.sendCancels(allCids)
// Should cancel the remaining want-blocks and want-haves for p0
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled")

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids5 := random.Cids(1)
pwm.sendCancels(append(cids5, cids[0]), "")
pwm.sendCancels(append(cids5, cids[0]))

require.Equal(t, 7, g.count, "Expected 7 wants")
require.Equal(t, 3, wbg.count, "Expected 3 want-blocks")
Expand All @@ -332,7 +332,7 @@ func TestStats(t *testing.T) {
require.Zero(t, wbg.count, "Expected 0 want-blocks")

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1], "")
pwm.sendCancels(cids2[:1])
require.Equal(t, 2, g.count, "Expected 2 wants")
require.Zero(t, wbg.count, "Expected 0 want-blocks")
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
require.Equal(t, 4, wbg.count, "Expected 4 want-blocks")

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "")
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})

require.Equal(t, 2, g.count, "Expected 2 wants")
require.Equal(t, 2, wbg.count, "Expected 2 want-blocks")
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type PeerManager interface {
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid, peer.ID)
SendCancels(context.Context, []cid.Cid)
}

// SessionManager manages all the sessions
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {}
func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
}

// Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks, p)
sm.peerManager.SendCancels(ctx, blks)
}

// CancelSessionWants is called when a session cancels wants because a call to
Expand All @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) {
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context may already be Done.
sm.peerManager.SendCancels(sm.ctx, wants, "")
sm.peerManager.SendCancels(sm.ctx, wants)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
fpm.cancels = append(fpm.cancels, cancels...)
Expand Down
Loading