From e6b35e9731d0467330426870bf21ca20f57e8c74 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 13 Aug 2019 15:04:41 -0400 Subject: [PATCH 01/12] fix: don't ignore received blocks for pending wants --- bitswap.go | 20 ++++----- bitswap_test.go | 65 +++++++++++++++++++++++++++ sessionmanager/sessionmanager.go | 14 ++++++ sessionmanager/sessionmanager_test.go | 27 +++++++++++ wantmanager/wantmanager.go | 26 ----------- 5 files changed, 116 insertions(+), 36 deletions(-) diff --git a/bitswap.go b/bitswap.go index c7af851f..29a37782 100644 --- a/bitswap.go +++ b/bitswap.go @@ -273,14 +273,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom("", []blocks.Block{blk}) + return bs.receiveBlocksFrom(nil, "", []blocks.Block{blk}) } // TODO: Some of this stuff really only needs to be done when adding a block // from the user, not when receiving it from the network. // In case you run `git blame` on this comment, I'll save you some time: ask // @whyrusleeping, I don't know the answers you seek. -func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error { +func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error { // Split blocks into wanted blocks vs duplicates wanted = make([]blocks.Block, 0, len(blks)) for _, b := range blks { - if bs.wm.IsWanted(b.Cid()) { + if bs.sm.InterestedIn(b.Cid()) { wanted = append(wanted, b) } else { log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from) @@ -354,6 +354,12 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error { } } + if from != "" { + for _, b := range wanted { + log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid()) + } + } + return nil } @@ -382,17 +388,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } // Process blocks - err := bs.receiveBlocksFrom(p, iblocks) + err := bs.receiveBlocksFrom(ctx, p, iblocks) if err != nil { log.Warningf("ReceiveMessage recvBlockFrom error: %s", err) return } - - for _, b := range iblocks { - if bs.wm.IsWanted(b.Cid()) { - log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid()) - } - } } func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) { diff --git a/bitswap_test.go b/bitswap_test.go index c6c3c8b8..9b757182 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -21,6 +21,7 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" + peer "github.com/libp2p/go-libp2p-core/peer" p2ptestutil "github.com/libp2p/go-libp2p-netutil" travis "github.com/libp2p/go-libp2p-testing/ci/travis" tu "github.com/libp2p/go-libp2p-testing/etc" @@ -138,6 +139,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { } } +// Tests that a received block is not stored in the blockstore if the block was +// not requested by the client func TestUnwantedBlockNotAdded(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) @@ -170,6 +173,68 @@ func TestUnwantedBlockNotAdded(t *testing.T) { } } +// Tests that a received block is returned to the client and stored in the +// blockstore in the following scenario: +// - the want for the block has been requested by the client +// - the want for the block has not yet been sent out to a peer +// (because the live request queue is full) +func TestPendingBlockAdded(t *testing.T) { + ctx := context.Background() + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + bg := blocksutil.NewBlockGenerator() + sessionBroadcastWantCapacity := 4 + + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() + + instance := ig.Instances(1)[0] + defer instance.Exchange.Close() + + oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + // Request enough blocks to exceed the session's broadcast want list + // capacity (by one block). The session will put the remaining block + // into the "tofetch" queue + blks := bg.Blocks(sessionBroadcastWantCapacity + 1) + ks := make([]cid.Cid, 0, len(blks)) + for _, b := range blks { + ks = append(ks, b.Cid()) + } + outch, err := instance.Exchange.GetBlocks(ctx, ks) + if err != nil { + t.Fatal(err) + } + + // Wait a little while to make sure the session has time to process the wants + time.Sleep(time.Millisecond * 20) + + // Simulate receiving a message which contains the block in the "tofetch" queue + lastBlock := blks[len(blks)-1] + bsMessage := message.New(true) + bsMessage.AddBlock(lastBlock) + unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23") + instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage) + + // Make sure Bitswap adds the block to the output channel + blkrecvd, ok := <-outch + if !ok { + t.Fatal("timed out waiting for block") + } + if !blkrecvd.Cid().Equals(lastBlock.Cid()) { + t.Fatal("received wrong block") + } + + // Make sure Bitswap adds the block to the blockstore + blockInStore, err := instance.Blockstore().Has(lastBlock.Cid()) + if err != nil { + t.Fatal(err) + } + if !blockInStore { + t.Fatal("Block was not added to block store") + } +} + func TestLargeSwarm(t *testing.T) { if testing.Short() { t.SkipNow() diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index d65b86f4..a702e6d5 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -131,3 +131,17 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) { s.session.ReceiveFrom(from, sessKs) } } + +// InterestedIn indicates whether any of the sessions are waiting to receive +// the block with the given CID. +func (sm *SessionManager) InterestedIn(cid cid.Cid) bool { + sm.sessLk.Lock() + defer sm.sessLk.Unlock() + + for _, s := range sm.sessions { + if s.session.InterestedIn(cid) { + return true + } + } + return false +} diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 0d0c94d6..0522a5b0 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -176,6 +176,33 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) { } } +func TestInterestedIn(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory) + + blks := testutil.GenerateBlocksOfSize(4, 1024) + var cids []cid.Cid + for _, b := range blks { + cids = append(cids, b.Cid()) + } + + nextInterestedIn = []cid.Cid{cids[0], cids[1]} + _ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) + nextInterestedIn = []cid.Cid{cids[0], cids[2]} + _ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) + + if !sm.InterestedIn(cids[0]) || + !sm.InterestedIn(cids[1]) || + !sm.InterestedIn(cids[2]) { + t.Fatal("expected interest but session manager was not interested") + } + if sm.InterestedIn(cids[3]) { + t.Fatal("expected no interest but session manager was interested") + } +} + func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index 2ed7082e..f726d684 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -80,22 +80,6 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe wm.addEntries(context.Background(), ks, peers, true, ses) } -// IsWanted returns whether a CID is currently wanted. -func (wm *WantManager) IsWanted(c cid.Cid) bool { - resp := make(chan bool, 1) - select { - case wm.wantMessages <- &isWantedMessage{c, resp}: - case <-wm.ctx.Done(): - return false - } - select { - case wanted := <-resp: - return wanted - case <-wm.ctx.Done(): - return false - } -} - // CurrentWants returns the list of current wants. func (wm *WantManager) CurrentWants() []wantlist.Entry { resp := make(chan []wantlist.Entry, 1) @@ -232,16 +216,6 @@ func (ws *wantSet) handle(wm *WantManager) { wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from) } -type isWantedMessage struct { - c cid.Cid - resp chan<- bool -} - -func (iwm *isWantedMessage) handle(wm *WantManager) { - _, isWanted := wm.wl.Contains(iwm.c) - iwm.resp <- isWanted -} - type currentWantsMessage struct { resp chan<- []wantlist.Entry } From 38dcf8c329199e123d0b89de7ece3d61a8865eda Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 16 Aug 2019 09:19:41 -0400 Subject: [PATCH 02/12] fix: use context.Background() instead of nil --- bitswap.go | 2 +- sessionmanager/sessionmanager_test.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bitswap.go b/bitswap.go index 29a37782..1bcf5e71 100644 --- a/bitswap.go +++ b/bitswap.go @@ -273,7 +273,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom(nil, "", []blocks.Block{blk}) + return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}) } // TODO: Some of this stuff really only needs to be done when adding a block diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 0522a5b0..2b303b6d 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -180,7 +180,9 @@ func TestInterestedIn(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory) + notif := notifications.New() + defer notif.Shutdown() + sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif) blks := testutil.GenerateBlocksOfSize(4, 1024) var cids []cid.Cid From 56219bd23b1a02bcdf74590f396e8fb6427b59f7 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Mon, 19 Aug 2019 22:47:37 -0700 Subject: [PATCH 03/12] refactor: use locks for session want management --- bitswap.go | 2 +- session/session.go | 334 +++++++++++++------------- session/session_test.go | 16 ++ sessionmanager/sessionmanager.go | 7 +- sessionmanager/sessionmanager_test.go | 11 +- 5 files changed, 201 insertions(+), 169 deletions(-) diff --git a/bitswap.go b/bitswap.go index 1bcf5e71..c42d80ad 100644 --- a/bitswap.go +++ b/bitswap.go @@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b // Split blocks into wanted blocks vs duplicates wanted = make([]blocks.Block, 0, len(blks)) for _, b := range blks { - if bs.sm.InterestedIn(b.Cid()) { + if bs.sm.IsWanted(b.Cid()) { wanted = append(wanted, b) } else { log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from) diff --git a/session/session.go b/session/session.go index 886971c9..76c8f3fd 100644 --- a/session/session.go +++ b/session/session.go @@ -3,9 +3,9 @@ package session import ( "context" "math/rand" + "sync" "time" - lru "github.com/hashicorp/golang-lru" bsgetter "github.com/ipfs/go-bitswap/getter" notifications "github.com/ipfs/go-bitswap/notifications" bssd "github.com/ipfs/go-bitswap/sessiondata" @@ -47,16 +47,18 @@ type RequestSplitter interface { RecordUniqueBlock() } -type interestReq struct { - c cid.Cid - resp chan bool -} - type rcvFrom struct { from peer.ID ks []cid.Cid } +type sessionWants struct { + sync.RWMutex + toFetch *cidQueue + liveWants map[cid.Cid]time.Time + pastWants *cid.Set +} + // Session holds state for an individual bitswap transfer operation. // This allows bitswap to make smarter decisions about who to send wantlist // info to, and who to request blocks from. @@ -67,19 +69,16 @@ type Session struct { pm PeerManager srs RequestSplitter + sw sessionWants + // channels incoming chan rcvFrom newReqs chan []cid.Cid cancelKeys chan []cid.Cid - interestReqs chan interestReq latencyReqs chan chan time.Duration tickDelayReqs chan time.Duration // do not touch outside run loop - tofetch *cidQueue - interest *lru.Cache - pastWants *cidQueue - liveWants map[cid.Cid]time.Time idleTick *time.Timer periodicSearchTimer *time.Timer baseTickDelay time.Duration @@ -105,12 +104,13 @@ func New(ctx context.Context, initialSearchDelay time.Duration, periodicSearchDelay delay.D) *Session { s := &Session{ - liveWants: make(map[cid.Cid]time.Time), + sw: sessionWants{ + toFetch: newCidQueue(), + liveWants: make(map[cid.Cid]time.Time), + pastWants: cid.NewSet(), + }, newReqs: make(chan []cid.Cid), cancelKeys: make(chan []cid.Cid), - tofetch: newCidQueue(), - pastWants: newCidQueue(), - interestReqs: make(chan interestReq), latencyReqs: make(chan chan time.Duration), tickDelayReqs: make(chan time.Duration), ctx: ctx, @@ -126,9 +126,6 @@ func New(ctx context.Context, periodicSearchDelay: periodicSearchDelay, } - cache, _ := lru.New(2048) - s.interest = cache - go s.run(ctx) return s @@ -142,34 +139,20 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) { } } -// InterestedIn returns true if this session is interested in the given Cid. +// IsWanted returns true if this session is waiting to receive the given Cid. +func (s *Session) IsWanted(c cid.Cid) bool { + s.sw.RLock() + defer s.sw.RUnlock() + + return s.unlockedIsWanted(c) +} + +// InterestedIn returns true if this session has ever requested the given Cid. func (s *Session) InterestedIn(c cid.Cid) bool { - if s.interest.Contains(c) { - return true - } - // TODO: PERF: this is using a channel to guard a map access against race - // conditions. This is definitely much slower than a mutex, though its unclear - // if it will actually induce any noticeable slowness. This is implemented this - // way to avoid adding a more complex set of mutexes around the liveWants map. - // note that in the average case (where this session *is* interested in the - // block we received) this function will not be called, as the cid will likely - // still be in the interest cache. - resp := make(chan bool, 1) - select { - case s.interestReqs <- interestReq{ - c: c, - resp: resp, - }: - case <-s.ctx.Done(): - return false - } + s.sw.RLock() + defer s.sw.RUnlock() - select { - case want := <-resp: - return want - case <-s.ctx.Done(): - return false - } + return s.unlockedIsWanted(c) || s.sw.pastWants.Has(c) } // GetBlock fetches a single block. @@ -233,23 +216,15 @@ func (s *Session) run(ctx context.Context) { for { select { case rcv := <-s.incoming: - s.cancelIncoming(ctx, rcv) - // Record statistics only if the blocks came from the network - // (blocks can also be received from the local node) - if rcv.from != "" { - s.updateReceiveCounters(ctx, rcv) - } s.handleIncoming(ctx, rcv) case keys := <-s.newReqs: - s.handleNewRequest(ctx, keys) + s.wantBlocks(ctx, keys) case keys := <-s.cancelKeys: s.handleCancel(keys) case <-s.idleTick.C: s.handleIdleTick(ctx) case <-s.periodicSearchTimer.C: s.handlePeriodicSearch(ctx) - case lwchk := <-s.interestReqs: - lwchk.resp <- s.cidIsWanted(lwchk.c) case resp := <-s.latencyReqs: resp <- s.averageLatency() case baseTickDelay := <-s.tickDelayReqs: @@ -261,59 +236,17 @@ func (s *Session) run(ctx context.Context) { } } -func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) { - // We've received the blocks so we can cancel any outstanding wants for them - wanted := make([]cid.Cid, 0, len(rcv.ks)) - for _, k := range rcv.ks { - if s.cidIsWanted(k) { - wanted = append(wanted, k) - } - } - s.pm.RecordCancels(wanted) - s.wm.CancelWants(s.ctx, wanted, nil, s.id) -} - -func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { - s.idleTick.Stop() - - // Process the received blocks - s.processIncoming(ctx, rcv.ks) - - s.resetIdleTick() -} - -func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) { - for _, k := range keys { - s.interest.Add(k, nil) - } - if toadd := s.wantBudget(); toadd > 0 { - if toadd > len(keys) { - toadd = len(keys) - } - - now := keys[:toadd] - keys = keys[toadd:] +func (s *Session) handleCancel(keys []cid.Cid) { + s.sw.Lock() + defer s.sw.Unlock() - s.wantBlocks(ctx, now) - } for _, k := range keys { - s.tofetch.Push(k) - } -} - -func (s *Session) handleCancel(keys []cid.Cid) { - for _, c := range keys { - s.tofetch.Remove(c) + s.sw.toFetch.Remove(k) } } func (s *Session) handleIdleTick(ctx context.Context) { - live := make([]cid.Cid, 0, len(s.liveWants)) - now := time.Now() - for c := range s.liveWants { - live = append(live, c) - s.liveWants[c] = now - } + live := s.prepareBroadcast() // Broadcast these keys to everyone we're connected to s.pm.RecordPeerRequests(nil, live) @@ -326,11 +259,27 @@ func (s *Session) handleIdleTick(ctx context.Context) { } s.resetIdleTick() - if len(s.liveWants) > 0 { + s.sw.RLock() + defer s.sw.RUnlock() + + if len(s.sw.liveWants) > 0 { s.consecutiveTicks++ } } +func (s *Session) prepareBroadcast() []cid.Cid { + s.sw.Lock() + defer s.sw.Unlock() + + live := make([]cid.Cid, 0, len(s.sw.liveWants)) + now := time.Now() + for c := range s.sw.liveWants { + live = append(live, c) + s.sw.liveWants[c] = now + } + return live +} + func (s *Session) handlePeriodicSearch(ctx context.Context) { randomWant := s.randomLiveWant() if !randomWant.Defined() { @@ -346,12 +295,15 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { } func (s *Session) randomLiveWant() cid.Cid { - if len(s.liveWants) == 0 { + s.sw.RLock() + defer s.sw.RUnlock() + + if len(s.sw.liveWants) == 0 { return cid.Cid{} } - i := rand.Intn(len(s.liveWants)) + i := rand.Intn(len(s.sw.liveWants)) // picking a random live want - for k := range s.liveWants { + for k := range s.sw.liveWants { if i == 0 { return k } @@ -359,83 +311,127 @@ func (s *Session) randomLiveWant() cid.Cid { } return cid.Cid{} } + func (s *Session) handleShutdown() { s.idleTick.Stop() - live := make([]cid.Cid, 0, len(s.liveWants)) - for c := range s.liveWants { + live := s.liveWants() + s.wm.CancelWants(s.ctx, live, nil, s.id) +} + +func (s *Session) liveWants() []cid.Cid { + s.sw.RLock() + defer s.sw.RUnlock() + + live := make([]cid.Cid, 0, len(s.sw.liveWants)) + for c := range s.sw.liveWants { live = append(live, c) } - s.wm.CancelWants(s.ctx, live, nil, s.id) + return live } -func (s *Session) cidIsWanted(c cid.Cid) bool { - _, ok := s.liveWants[c] +func (s *Session) unlockedIsWanted(c cid.Cid) bool { + _, ok := s.sw.liveWants[c] if !ok { - ok = s.tofetch.Has(c) + ok = s.sw.toFetch.Has(c) } return ok } -func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) { - for _, c := range ks { - if s.cidIsWanted(c) { - // If the block CID was in the live wants queue, remove it - tval, ok := s.liveWants[c] - if ok { - s.latTotal += time.Since(tval) - delete(s.liveWants, c) - } else { - // Otherwise remove it from the tofetch queue, if it was there - s.tofetch.Remove(c) - } - s.fetchcnt++ - - // We've received new wanted blocks, so reset the number of ticks - // that have occurred since the last new block - s.consecutiveTicks = 0 - - // Keep track of CIDs we've successfully fetched - s.pastWants.Push(c) - } +func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { + // Record statistics only if the blocks came from the network + // (blocks can also be received from the local node) + if rcv.from != "" { + s.updateReceiveCounters(ctx, rcv) } - // Transfer as many CIDs as possible from the tofetch queue into the - // live wants queue - toAdd := s.wantBudget() - if toAdd > s.tofetch.Len() { - toAdd = s.tofetch.Len() - } - if toAdd > 0 { - var keys []cid.Cid - for i := 0; i < toAdd; i++ { - keys = append(keys, s.tofetch.Pop()) - } - s.wantBlocks(ctx, keys) + // Update the want list + wanted, totalLatency := s.blocksReceived(rcv.ks) + if len(wanted) == 0 { + return } + + // We've received the blocks so we can cancel any outstanding wants for them + s.cancelIncoming(ctx, wanted) + + s.idleTick.Stop() + + // Process the received blocks + s.processIncoming(ctx, wanted, totalLatency) + + s.resetIdleTick() } func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) { - for _, k := range rcv.ks { - // Inform the request splitter of unique / duplicate blocks - if s.cidIsWanted(k) { + s.sw.RLock() + + for _, c := range rcv.ks { + if s.unlockedIsWanted(c) { s.srs.RecordUniqueBlock() - } else if s.pastWants.Has(k) { + } else if s.sw.pastWants.Has(c) { s.srs.RecordDuplicateBlock() } } + s.sw.RUnlock() + // Record response (to be able to time latency) if len(rcv.ks) > 0 { s.pm.RecordPeerResponse(rcv.from, rcv.ks) } } -func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { - now := time.Now() - for _, c := range ks { - s.liveWants[c] = now +func (s *Session) blocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) { + s.sw.Lock() + defer s.sw.Unlock() + + totalLatency := time.Duration(0) + wanted := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { + if s.unlockedIsWanted(c) { + wanted = append(wanted, c) + + // If the block CID was in the live wants queue, remove it + tval, ok := s.sw.liveWants[c] + if ok { + totalLatency += time.Since(tval) + delete(s.sw.liveWants, c) + } else { + // Otherwise remove it from the toFetch queue, if it was there + s.sw.toFetch.Remove(c) + } + + // Keep track of CIDs we've successfully fetched + s.sw.pastWants.Add(c) + } + } + + return wanted, totalLatency +} + +func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) { + s.pm.RecordCancels(ks) + s.wm.CancelWants(s.ctx, ks, nil, s.id) +} + +func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) { + // Keep track of the total number of blocks received and total latency + s.fetchcnt += len(ks) + s.latTotal += totalLatency + + // We've received new wanted blocks, so reset the number of ticks + // that have occurred since the last new block + s.consecutiveTicks = 0 + + s.wantBlocks(ctx, nil) +} + +func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { + ks := s.getNextWants(s.wantLimit(), newks) + if len(ks) == 0 { + return } + peers := s.pm.GetOptimizedPeers() if len(peers) > 0 { splitRequests := s.srs.SplitRequest(peers, ks) @@ -449,6 +445,29 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { } } +func (s *Session) getNextWants(limit int, newWants []cid.Cid) []cid.Cid { + s.sw.Lock() + defer s.sw.Unlock() + + now := time.Now() + + for _, k := range newWants { + s.sw.toFetch.Push(k) + } + + currentLiveCount := len(s.sw.liveWants) + toAdd := limit - currentLiveCount + + var live []cid.Cid + for ; toAdd > 0 && s.sw.toFetch.Len() > 0; toAdd-- { + c := s.sw.toFetch.Pop() + live = append(live, c) + s.sw.liveWants[c] = now + } + + return live +} + func (s *Session) averageLatency() time.Duration { return s.latTotal / time.Duration(s.fetchcnt) } @@ -465,16 +484,9 @@ func (s *Session) resetIdleTick() { s.idleTick.Reset(tickDelay) } -func (s *Session) wantBudget() int { - live := len(s.liveWants) - var budget int +func (s *Session) wantLimit() int { if len(s.pm.GetOptimizedPeers()) > 0 { - budget = targetedLiveWantsLimit - live - } else { - budget = broadcastLiveWantsLimit - live - } - if budget < 0 { - budget = 0 + return targetedLiveWantsLimit } - return budget + return broadcastLiveWantsLimit } diff --git a/session/session_test.go b/session/session_test.go index 07b834a8..3a52fbdf 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -118,6 +118,14 @@ func TestSessionGetBlocks(t *testing.T) { if receivedWantReq.peers != nil { t.Fatal("first want request should be a broadcast") } + for _, c := range cids { + if !session.IsWanted(c) { + t.Fatal("expected session to want cids") + } + if !session.InterestedIn(c) { + t.Fatal("expected session to be interested in cids") + } + } // now receive the first set of blocks peers := testutil.GeneratePeers(broadcastLiveWantsLimit) @@ -211,6 +219,14 @@ func TestSessionGetBlocks(t *testing.T) { t.Fatal("received incorrect block") } } + for _, c := range cids { + if session.IsWanted(c) { + t.Fatal("expected session NOT to want cids") + } + if !session.InterestedIn(c) { + t.Fatal("expected session to still be interested in cids") + } + } } func TestSessionFindMorePeers(t *testing.T) { diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index a702e6d5..7e73bfe4 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -19,6 +19,7 @@ type Session interface { exchange.Fetcher InterestedIn(cid.Cid) bool ReceiveFrom(peer.ID, []cid.Cid) + IsWanted(cid.Cid) bool } type sesTrk struct { @@ -132,14 +133,14 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) { } } -// InterestedIn indicates whether any of the sessions are waiting to receive +// IsWanted indicates whether any of the sessions are waiting to receive // the block with the given CID. -func (sm *SessionManager) InterestedIn(cid cid.Cid) bool { +func (sm *SessionManager) IsWanted(cid cid.Cid) bool { sm.sessLk.Lock() defer sm.sessLk.Unlock() for _, s := range sm.sessions { - if s.session.InterestedIn(cid) { + if s.session.IsWanted(cid) { return true } } diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 2b303b6d..022b6c02 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -40,6 +40,9 @@ func (fs *fakeSession) InterestedIn(c cid.Cid) bool { } return false } +func (fs *fakeSession) IsWanted(c cid.Cid) bool { + return fs.InterestedIn(c) +} func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) { fs.ks = append(fs.ks, ks...) } @@ -195,12 +198,12 @@ func TestInterestedIn(t *testing.T) { nextInterestedIn = []cid.Cid{cids[0], cids[2]} _ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - if !sm.InterestedIn(cids[0]) || - !sm.InterestedIn(cids[1]) || - !sm.InterestedIn(cids[2]) { + if !sm.IsWanted(cids[0]) || + !sm.IsWanted(cids[1]) || + !sm.IsWanted(cids[2]) { t.Fatal("expected interest but session manager was not interested") } - if sm.InterestedIn(cids[3]) { + if sm.IsWanted(cids[3]) { t.Fatal("expected no interest but session manager was interested") } } From 7458eb8f2036347be0e83461e983204e0be4edde Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 20 Aug 2019 08:54:33 -0700 Subject: [PATCH 04/12] test: better session manager test naming --- sessionmanager/sessionmanager_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 022b6c02..411aee70 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -179,7 +179,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) { } } -func TestInterestedIn(t *testing.T) { +func TestIsWanted(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -201,10 +201,10 @@ func TestInterestedIn(t *testing.T) { if !sm.IsWanted(cids[0]) || !sm.IsWanted(cids[1]) || !sm.IsWanted(cids[2]) { - t.Fatal("expected interest but session manager was not interested") + t.Fatal("expected unwanted but session manager did want cid") } if sm.IsWanted(cids[3]) { - t.Fatal("expected no interest but session manager was interested") + t.Fatal("expected wanted but session manager did not want cid") } } From e9661edcdb47ef54b26a34eea6e0a51a5f788803 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 13:16:31 -0700 Subject: [PATCH 05/12] refactor: session want management --- session/session.go | 171 +++-------------------- session/session_test.go | 6 - session/sessionwants.go | 190 ++++++++++++++++++++++++++ session/sessionwants_test.go | 152 +++++++++++++++++++++ sessionmanager/sessionmanager.go | 19 ++- sessionmanager/sessionmanager_test.go | 47 +++---- 6 files changed, 390 insertions(+), 195 deletions(-) create mode 100644 session/sessionwants.go create mode 100644 session/sessionwants_test.go diff --git a/session/session.go b/session/session.go index 76c8f3fd..d2263aa6 100644 --- a/session/session.go +++ b/session/session.go @@ -2,8 +2,6 @@ package session import ( "context" - "math/rand" - "sync" "time" bsgetter "github.com/ipfs/go-bitswap/getter" @@ -52,13 +50,6 @@ type rcvFrom struct { ks []cid.Cid } -type sessionWants struct { - sync.RWMutex - toFetch *cidQueue - liveWants map[cid.Cid]time.Time - pastWants *cid.Set -} - // Session holds state for an individual bitswap transfer operation. // This allows bitswap to make smarter decisions about who to send wantlist // info to, and who to request blocks from. @@ -133,26 +124,20 @@ func New(ctx context.Context, // ReceiveFrom receives incoming blocks from the given peer. func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) { + interested := s.sw.FilterInteresting(ks) + if len(interested) == 0 { + return + } + select { - case s.incoming <- rcvFrom{from: from, ks: ks}: + case s.incoming <- rcvFrom{from: from, ks: interested}: case <-s.ctx.Done(): } } // IsWanted returns true if this session is waiting to receive the given Cid. func (s *Session) IsWanted(c cid.Cid) bool { - s.sw.RLock() - defer s.sw.RUnlock() - - return s.unlockedIsWanted(c) -} - -// InterestedIn returns true if this session has ever requested the given Cid. -func (s *Session) InterestedIn(c cid.Cid) bool { - s.sw.RLock() - defer s.sw.RUnlock() - - return s.unlockedIsWanted(c) || s.sw.pastWants.Has(c) + return s.sw.IsWanted(c) } // GetBlock fetches a single block. @@ -220,7 +205,7 @@ func (s *Session) run(ctx context.Context) { case keys := <-s.newReqs: s.wantBlocks(ctx, keys) case keys := <-s.cancelKeys: - s.handleCancel(keys) + s.sw.CancelPending(keys) case <-s.idleTick.C: s.handleIdleTick(ctx) case <-s.periodicSearchTimer.C: @@ -236,17 +221,8 @@ func (s *Session) run(ctx context.Context) { } } -func (s *Session) handleCancel(keys []cid.Cid) { - s.sw.Lock() - defer s.sw.Unlock() - - for _, k := range keys { - s.sw.toFetch.Remove(k) - } -} - func (s *Session) handleIdleTick(ctx context.Context) { - live := s.prepareBroadcast() + live := s.sw.PrepareBroadcast() // Broadcast these keys to everyone we're connected to s.pm.RecordPeerRequests(nil, live) @@ -259,29 +235,13 @@ func (s *Session) handleIdleTick(ctx context.Context) { } s.resetIdleTick() - s.sw.RLock() - defer s.sw.RUnlock() - - if len(s.sw.liveWants) > 0 { + if s.sw.HasLiveWants() { s.consecutiveTicks++ } } -func (s *Session) prepareBroadcast() []cid.Cid { - s.sw.Lock() - defer s.sw.Unlock() - - live := make([]cid.Cid, 0, len(s.sw.liveWants)) - now := time.Now() - for c := range s.sw.liveWants { - live = append(live, c) - s.sw.liveWants[c] = now - } - return live -} - func (s *Session) handlePeriodicSearch(ctx context.Context) { - randomWant := s.randomLiveWant() + randomWant := s.sw.RandomLiveWant() if !randomWant.Defined() { return } @@ -294,50 +254,13 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) } -func (s *Session) randomLiveWant() cid.Cid { - s.sw.RLock() - defer s.sw.RUnlock() - - if len(s.sw.liveWants) == 0 { - return cid.Cid{} - } - i := rand.Intn(len(s.sw.liveWants)) - // picking a random live want - for k := range s.sw.liveWants { - if i == 0 { - return k - } - i-- - } - return cid.Cid{} -} - func (s *Session) handleShutdown() { s.idleTick.Stop() - live := s.liveWants() + live := s.sw.LiveWants() s.wm.CancelWants(s.ctx, live, nil, s.id) } -func (s *Session) liveWants() []cid.Cid { - s.sw.RLock() - defer s.sw.RUnlock() - - live := make([]cid.Cid, 0, len(s.sw.liveWants)) - for c := range s.sw.liveWants { - live = append(live, c) - } - return live -} - -func (s *Session) unlockedIsWanted(c cid.Cid) bool { - _, ok := s.sw.liveWants[c] - if !ok { - ok = s.sw.toFetch.Has(c) - } - return ok -} - func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { // Record statistics only if the blocks came from the network // (blocks can also be received from the local node) @@ -346,7 +269,7 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { } // Update the want list - wanted, totalLatency := s.blocksReceived(rcv.ks) + wanted, totalLatency := s.sw.BlocksReceived(rcv.ks) if len(wanted) == 0 { return } @@ -363,17 +286,8 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { } func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) { - s.sw.RLock() - - for _, c := range rcv.ks { - if s.unlockedIsWanted(c) { - s.srs.RecordUniqueBlock() - } else if s.sw.pastWants.Has(c) { - s.srs.RecordDuplicateBlock() - } - } - - s.sw.RUnlock() + // Record unique vs duplicate blocks + s.sw.ForEachUniqDup(rcv.ks, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock) // Record response (to be able to time latency) if len(rcv.ks) > 0 { @@ -381,34 +295,6 @@ func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) { } } -func (s *Session) blocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) { - s.sw.Lock() - defer s.sw.Unlock() - - totalLatency := time.Duration(0) - wanted := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { - if s.unlockedIsWanted(c) { - wanted = append(wanted, c) - - // If the block CID was in the live wants queue, remove it - tval, ok := s.sw.liveWants[c] - if ok { - totalLatency += time.Since(tval) - delete(s.sw.liveWants, c) - } else { - // Otherwise remove it from the toFetch queue, if it was there - s.sw.toFetch.Remove(c) - } - - // Keep track of CIDs we've successfully fetched - s.sw.pastWants.Add(c) - } - } - - return wanted, totalLatency -} - func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) { s.pm.RecordCancels(ks) s.wm.CancelWants(s.ctx, ks, nil, s.id) @@ -427,7 +313,9 @@ func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatenc } func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { - ks := s.getNextWants(s.wantLimit(), newks) + // Given the want limit and any newly received blocks, get as many wants as + // we can to send out + ks := s.sw.GetNextWants(s.wantLimit(), newks) if len(ks) == 0 { return } @@ -445,29 +333,6 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { } } -func (s *Session) getNextWants(limit int, newWants []cid.Cid) []cid.Cid { - s.sw.Lock() - defer s.sw.Unlock() - - now := time.Now() - - for _, k := range newWants { - s.sw.toFetch.Push(k) - } - - currentLiveCount := len(s.sw.liveWants) - toAdd := limit - currentLiveCount - - var live []cid.Cid - for ; toAdd > 0 && s.sw.toFetch.Len() > 0; toAdd-- { - c := s.sw.toFetch.Pop() - live = append(live, c) - s.sw.liveWants[c] = now - } - - return live -} - func (s *Session) averageLatency() time.Duration { return s.latTotal / time.Duration(s.fetchcnt) } diff --git a/session/session_test.go b/session/session_test.go index 3a52fbdf..19266d1b 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -122,9 +122,6 @@ func TestSessionGetBlocks(t *testing.T) { if !session.IsWanted(c) { t.Fatal("expected session to want cids") } - if !session.InterestedIn(c) { - t.Fatal("expected session to be interested in cids") - } } // now receive the first set of blocks @@ -223,9 +220,6 @@ func TestSessionGetBlocks(t *testing.T) { if session.IsWanted(c) { t.Fatal("expected session NOT to want cids") } - if !session.InterestedIn(c) { - t.Fatal("expected session to still be interested in cids") - } } } diff --git a/session/sessionwants.go b/session/sessionwants.go new file mode 100644 index 00000000..58684ae8 --- /dev/null +++ b/session/sessionwants.go @@ -0,0 +1,190 @@ +package session + +import ( + "math/rand" + "sync" + "time" + + cid "github.com/ipfs/go-cid" +) + +type sessionWants struct { + sync.RWMutex + toFetch *cidQueue + liveWants map[cid.Cid]time.Time + pastWants *cid.Set +} + +// BlocksReceived moves received block CIDs from live to past wants and +// measures latency. It returns the CIDs of blocks that were actually wanted +// (as opposed to duplicates) and the total latency for all incoming blocks. +func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) { + sw.Lock() + defer sw.Unlock() + + totalLatency := time.Duration(0) + wanted := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { + if sw.unlockedIsWanted(c) { + wanted = append(wanted, c) + + // If the block CID was in the live wants queue, remove it + tval, ok := sw.liveWants[c] + if ok { + totalLatency += time.Since(tval) + delete(sw.liveWants, c) + } else { + // Otherwise remove it from the toFetch queue, if it was there + sw.toFetch.Remove(c) + } + + // Keep track of CIDs we've successfully fetched + sw.pastWants.Add(c) + } + } + + return wanted, totalLatency +} + +// GetNextWants adds any new wants to the list of CIDs to fetch, then moves as +// many CIDs from the fetch queue to the live wants list as possible (given the +// limit). Returns the newly live wants. +func (sw *sessionWants) GetNextWants(limit int, newWants []cid.Cid) []cid.Cid { + now := time.Now() + + sw.Lock() + defer sw.Unlock() + + // Add new wants to the fetch queue + for _, k := range newWants { + sw.toFetch.Push(k) + } + + // Move CIDs from fetch queue to the live wants queue (up to the limit) + currentLiveCount := len(sw.liveWants) + toAdd := limit - currentLiveCount + + var live []cid.Cid + for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- { + c := sw.toFetch.Pop() + live = append(live, c) + sw.liveWants[c] = now + } + + return live +} + +// PrepareBroadcast saves the current time for each live want and returns the +// live want CIDs. +func (sw *sessionWants) PrepareBroadcast() []cid.Cid { + now := time.Now() + + sw.Lock() + defer sw.Unlock() + + live := make([]cid.Cid, 0, len(sw.liveWants)) + for c := range sw.liveWants { + live = append(live, c) + sw.liveWants[c] = now + } + return live +} + +// CancelPending removes the given CIDs from the fetch queue. +func (sw *sessionWants) CancelPending(keys []cid.Cid) { + sw.Lock() + defer sw.Unlock() + + for _, k := range keys { + sw.toFetch.Remove(k) + } +} + +// ForEachUniqDup iterates over each of the given CIDs and calls isUniqFn +// if the session is expecting a block for the CID, or isDupFn if the session +// has already received the block. +func (sw *sessionWants) ForEachUniqDup(ks []cid.Cid, isUniqFn, isDupFn func()) { + sw.RLock() + + for _, k := range ks { + if sw.unlockedIsWanted(k) { + isUniqFn() + } else if sw.pastWants.Has(k) { + isDupFn() + } + } + + sw.RUnlock() +} + +// LiveWants returns a list of live wants +func (sw *sessionWants) LiveWants() []cid.Cid { + sw.RLock() + defer sw.RUnlock() + + live := make([]cid.Cid, 0, len(sw.liveWants)) + for c := range sw.liveWants { + live = append(live, c) + } + return live +} + +// RandomLiveWant returns a randomly selected live want +func (sw *sessionWants) RandomLiveWant() cid.Cid { + sw.RLock() + defer sw.RUnlock() + + if len(sw.liveWants) == 0 { + return cid.Cid{} + } + i := rand.Intn(len(sw.liveWants)) + // picking a random live want + for k := range sw.liveWants { + if i == 0 { + return k + } + i-- + } + return cid.Cid{} +} + +// Has live wants indicates if there are any live wants +func (sw *sessionWants) HasLiveWants() bool { + sw.RLock() + defer sw.RUnlock() + + return len(sw.liveWants) > 0 +} + +// IsWanted indicates if the session is expecting to receive the block with the +// given CID +func (sw *sessionWants) IsWanted(c cid.Cid) bool { + sw.RLock() + defer sw.RUnlock() + + return sw.unlockedIsWanted(c) +} + +// FilterInteresting filters the list so that it only contains keys for +// blocks that the session is waiting to receive or has received in the past +func (sw *sessionWants) FilterInteresting(ks []cid.Cid) []cid.Cid { + sw.RLock() + defer sw.RUnlock() + + interested := make([]cid.Cid, 0, len(ks)) + for _, k := range ks { + if sw.unlockedIsWanted(k) || sw.pastWants.Has(k) { + interested = append(interested, k) + } + } + + return interested +} + +func (sw *sessionWants) unlockedIsWanted(c cid.Cid) bool { + _, ok := sw.liveWants[c] + if !ok { + ok = sw.toFetch.Has(c) + } + return ok +} diff --git a/session/sessionwants_test.go b/session/sessionwants_test.go new file mode 100644 index 00000000..87972924 --- /dev/null +++ b/session/sessionwants_test.go @@ -0,0 +1,152 @@ +package session + +import ( + "testing" + "time" + + "github.com/ipfs/go-bitswap/testutil" + cid "github.com/ipfs/go-cid" +) + +func TestSessionWants(t *testing.T) { + sw := sessionWants{ + toFetch: newCidQueue(), + liveWants: make(map[cid.Cid]time.Time), + pastWants: cid.NewSet(), + } + cids := testutil.GenerateCids(10) + others := testutil.GenerateCids(1) + + // Expect these functions to return nothing on a new sessionWants + lws := sw.PrepareBroadcast() + if len(lws) > 0 { + t.Fatal("expected no broadcast wants") + } + lws = sw.LiveWants() + if len(lws) > 0 { + t.Fatal("expected no live wants") + } + if sw.HasLiveWants() { + t.Fatal("expected not to have live wants") + } + rw := sw.RandomLiveWant() + if rw.Defined() { + t.Fatal("expected no random want") + } + if sw.IsWanted(cids[0]) { + t.Fatal("expected cid to not be wanted") + } + if len(sw.FilterInteresting(cids)) > 0 { + t.Fatal("expected no interesting wants") + } + + // Add 10 new wants with a limit of 5 + // The first 5 cids should go into the toFetch queue + // The other 5 cids should go into the live want queue + // toFetch Live Past + // 98765 43210 + nextw := sw.GetNextWants(5, cids) + if len(nextw) != 5 { + t.Fatal("expected 5 next wants") + } + lws = sw.PrepareBroadcast() + if len(lws) != 5 { + t.Fatal("expected 5 broadcast wants") + } + lws = sw.LiveWants() + if len(lws) != 5 { + t.Fatal("expected 5 live wants") + } + if !sw.HasLiveWants() { + t.Fatal("expected to have live wants") + } + rw = sw.RandomLiveWant() + if !rw.Defined() { + t.Fatal("expected random want") + } + if !sw.IsWanted(cids[0]) { + t.Fatal("expected cid to be wanted") + } + if !sw.IsWanted(cids[9]) { + t.Fatal("expected cid to be wanted") + } + if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[9], others[0]})) != 2 { + t.Fatal("expected 2 interesting wants") + } + + // Two wanted blocks and one other block are received. + // The wanted blocks should be moved from the live wants queue + // to the past wants set (the other block CID should be ignored) + // toFetch Live Past + // 98765 432__ 10 + recvdCids := []cid.Cid{cids[0], cids[1], others[0]} + uniq := 0 + dup := 0 + sw.ForEachUniqDup(recvdCids, func() { uniq++ }, func() { dup++ }) + if uniq != 2 || dup != 0 { + t.Fatal("expected 2 uniqs / 0 dups", uniq, dup) + } + sw.BlocksReceived(recvdCids) + lws = sw.LiveWants() + if len(lws) != 3 { + t.Fatal("expected 3 live wants") + } + if sw.IsWanted(cids[0]) { + t.Fatal("expected cid to no longer be wanted") + } + if !sw.IsWanted(cids[9]) { + t.Fatal("expected cid to be wanted") + } + if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[9], others[0]})) != 2 { + t.Fatal("expected 2 interesting wants") + } + + // Ask for next wants with a limit of 5 + // Should move 2 wants from toFetch queue to live wants + // toFetch Live Past + // 987__ 65432 10 + nextw = sw.GetNextWants(5, nil) + if len(nextw) != 2 { + t.Fatal("expected 2 next wants") + } + lws = sw.LiveWants() + if len(lws) != 5 { + t.Fatal("expected 5 live wants") + } + if !sw.IsWanted(cids[5]) { + t.Fatal("expected cid to be wanted") + } + + // One wanted block and one dup block are received. + // The wanted block should be moved from the live wants queue + // to the past wants set + // toFetch Live Past + // 987 654_2 310 + recvdCids = []cid.Cid{cids[0], cids[3]} + uniq = 0 + dup = 0 + sw.ForEachUniqDup(recvdCids, func() { uniq++ }, func() { dup++ }) + if uniq != 1 || dup != 1 { + t.Fatal("expected 1 uniq / 1 dup", uniq, dup) + } + sw.BlocksReceived(recvdCids) + lws = sw.LiveWants() + if len(lws) != 4 { + t.Fatal("expected 4 live wants") + } + + // One block in the toFetch queue should be cancelled + // toFetch Live Past + // 9_7 654_2 310 + sw.CancelPending([]cid.Cid{cids[8]}) + lws = sw.LiveWants() + if len(lws) != 4 { + t.Fatal("expected 4 live wants") + } + if sw.IsWanted(cids[8]) { + t.Fatal("expected cid to no longer be wanted") + } + if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[8]})) != 1 { + t.Fatal("expected 1 interesting wants") + } +} diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 7e73bfe4..3ec30bbc 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -17,7 +17,6 @@ import ( // Session is a session that is managed by the session manager type Session interface { exchange.Fetcher - InterestedIn(cid.Cid) bool ReceiveFrom(peer.ID, []cid.Cid) IsWanted(cid.Cid) bool } @@ -115,22 +114,20 @@ func (sm *SessionManager) GetNextSessionID() uint64 { return sm.sessID } -// ReceiveFrom receives blocks from a peer and dispatches to interested -// sessions. +// ReceiveFrom receives block CIDs from a peer and dispatches to sessions. func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) { sm.sessLk.Lock() defer sm.sessLk.Unlock() - // Only give each session the blocks / dups that it is interested in + var wg sync.WaitGroup for _, s := range sm.sessions { - sessKs := make([]cid.Cid, 0, len(ks)) - for _, k := range ks { - if s.session.InterestedIn(k) { - sessKs = append(sessKs, k) - } - } - s.session.ReceiveFrom(from, sessKs) + wg.Add(1) + go func() { + defer wg.Done() + s.session.ReceiveFrom(from, ks) + }() } + wg.Wait() } // IsWanted indicates whether any of the sessions are waiting to receive diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 411aee70..2bd234cb 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -18,12 +18,12 @@ import ( ) type fakeSession struct { - interested []cid.Cid - ks []cid.Cid - id uint64 - pm *fakePeerManager - srs *fakeRequestSplitter - notif notifications.PubSub + wanted []cid.Cid + ks []cid.Cid + id uint64 + pm *fakePeerManager + srs *fakeRequestSplitter + notif notifications.PubSub } func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { @@ -32,17 +32,14 @@ func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { return nil, nil } -func (fs *fakeSession) InterestedIn(c cid.Cid) bool { - for _, ic := range fs.interested { +func (fs *fakeSession) IsWanted(c cid.Cid) bool { + for _, ic := range fs.wanted { if c == ic { return true } } return false } -func (fs *fakeSession) IsWanted(c cid.Cid) bool { - return fs.InterestedIn(c) -} func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) { fs.ks = append(fs.ks, ks...) } @@ -66,7 +63,7 @@ func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer func (frs *fakeRequestSplitter) RecordDuplicateBlock() {} func (frs *fakeRequestSplitter) RecordUniqueBlock() {} -var nextInterestedIn []cid.Cid +var nextWanted []cid.Cid func sessionFactory(ctx context.Context, id uint64, @@ -76,11 +73,11 @@ func sessionFactory(ctx context.Context, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session { return &fakeSession{ - interested: nextInterestedIn, - id: id, - pm: pm.(*fakePeerManager), - srs: srs.(*fakeRequestSplitter), - notif: notif, + wanted: nextWanted, + id: id, + pm: pm.(*fakePeerManager), + srs: srs.(*fakeRequestSplitter), + notif: notif, } } @@ -121,7 +118,7 @@ func TestAddingSessions(t *testing.T) { p := peer.ID(123) block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test - nextInterestedIn = []cid.Cid{block.Cid()} + nextWanted = []cid.Cid{block.Cid()} currentID := sm.GetNextSessionID() firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) @@ -163,11 +160,11 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) { cids = append(cids, b.Cid()) } - nextInterestedIn = []cid.Cid{cids[0], cids[1]} + nextWanted = []cid.Cid{cids[0], cids[1]} firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - nextInterestedIn = []cid.Cid{cids[0]} + nextWanted = []cid.Cid{cids[0]} secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - nextInterestedIn = []cid.Cid{} + nextWanted = []cid.Cid{} thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()}) @@ -193,9 +190,9 @@ func TestIsWanted(t *testing.T) { cids = append(cids, b.Cid()) } - nextInterestedIn = []cid.Cid{cids[0], cids[1]} + nextWanted = []cid.Cid{cids[0], cids[1]} _ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - nextInterestedIn = []cid.Cid{cids[0], cids[2]} + nextWanted = []cid.Cid{cids[0], cids[2]} _ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) if !sm.IsWanted(cids[0]) || @@ -218,7 +215,7 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) { p := peer.ID(123) block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test - nextInterestedIn = []cid.Cid{block.Cid()} + nextWanted = []cid.Cid{block.Cid()} firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) @@ -245,7 +242,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) { p := peer.ID(123) block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test - nextInterestedIn = []cid.Cid{block.Cid()} + nextWanted = []cid.Cid{block.Cid()} firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) sessionCtx, sessionCancel := context.WithCancel(ctx) secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) From 1e10d28b3d8a443f7010c9dc9b022091cfb21dac Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 20:52:03 -0400 Subject: [PATCH 06/12] refactor: remove extraneous go routine --- sessionmanager/sessionmanager.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 3ec30bbc..cf3fe98d 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -119,15 +119,9 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) { sm.sessLk.Lock() defer sm.sessLk.Unlock() - var wg sync.WaitGroup for _, s := range sm.sessions { - wg.Add(1) - go func() { - defer wg.Done() - s.session.ReceiveFrom(from, ks) - }() + s.session.ReceiveFrom(from, ks) } - wg.Wait() } // IsWanted indicates whether any of the sessions are waiting to receive From a2d6e30b10263d4dfd7f32c840eccf4f28af03ce Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 20:53:54 -0400 Subject: [PATCH 07/12] refactor: remove extraneous alloc --- session/sessionwants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/sessionwants.go b/session/sessionwants.go index 58684ae8..e32c34a7 100644 --- a/session/sessionwants.go +++ b/session/sessionwants.go @@ -171,7 +171,7 @@ func (sw *sessionWants) FilterInteresting(ks []cid.Cid) []cid.Cid { sw.RLock() defer sw.RUnlock() - interested := make([]cid.Cid, 0, len(ks)) + var interested []cid.Cid for _, k := range ks { if sw.unlockedIsWanted(k) || sw.pastWants.Has(k) { interested = append(interested, k) From 95de855189029bbcb8b8c0d02149616824a94af0 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 20:55:46 -0400 Subject: [PATCH 08/12] refactor: move timing outside lock --- session/sessionwants.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/session/sessionwants.go b/session/sessionwants.go index e32c34a7..fdf30cf3 100644 --- a/session/sessionwants.go +++ b/session/sessionwants.go @@ -19,6 +19,8 @@ type sessionWants struct { // measures latency. It returns the CIDs of blocks that were actually wanted // (as opposed to duplicates) and the total latency for all incoming blocks. func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) { + now := time.Now() + sw.Lock() defer sw.Unlock() @@ -31,7 +33,7 @@ func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration // If the block CID was in the live wants queue, remove it tval, ok := sw.liveWants[c] if ok { - totalLatency += time.Since(tval) + totalLatency += now.Sub(tval) delete(sw.liveWants, c) } else { // Otherwise remove it from the toFetch queue, if it was there From 84f61d6a980e13c07e4fd057613edf4746e0c1b8 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 21:10:37 -0400 Subject: [PATCH 09/12] refactor: move rand outside lock --- session/sessionwants.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/session/sessionwants.go b/session/sessionwants.go index fdf30cf3..26eed8b9 100644 --- a/session/sessionwants.go +++ b/session/sessionwants.go @@ -1,6 +1,7 @@ package session import ( + "math" "math/rand" "sync" "time" @@ -133,13 +134,15 @@ func (sw *sessionWants) LiveWants() []cid.Cid { // RandomLiveWant returns a randomly selected live want func (sw *sessionWants) RandomLiveWant() cid.Cid { + r := rand.Float64() + sw.RLock() defer sw.RUnlock() if len(sw.liveWants) == 0 { return cid.Cid{} } - i := rand.Intn(len(sw.liveWants)) + i := math.Floor(r * float64(len(sw.liveWants))) // picking a random live want for k := range sw.liveWants { if i == 0 { From ec9fb77f9698b7ed899c601595bc4da0f4e2facb Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 22 Aug 2019 22:03:58 -0400 Subject: [PATCH 10/12] test: remove test that is no longer needed --- sessionmanager/sessionmanager_test.go | 31 --------------------------- 1 file changed, 31 deletions(-) diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 2bd234cb..dfd3446c 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -145,37 +145,6 @@ func TestAddingSessions(t *testing.T) { } } -func TestReceivingBlocksWhenNotInterested(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - notif := notifications.New() - defer notif.Shutdown() - sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif) - - p := peer.ID(123) - blks := testutil.GenerateBlocksOfSize(3, 1024) - var cids []cid.Cid - for _, b := range blks { - cids = append(cids, b.Cid()) - } - - nextWanted = []cid.Cid{cids[0], cids[1]} - firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - nextWanted = []cid.Cid{cids[0]} - secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - nextWanted = []cid.Cid{} - thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) - - sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()}) - - if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) || - !cmpSessionCids(secondSession, []cid.Cid{cids[0]}) || - !cmpSessionCids(thirdSession, []cid.Cid{}) { - t.Fatal("did not receive correct blocks for sessions") - } -} - func TestIsWanted(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) From 6197217642d193a897065d86782ad3719c1021dc Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 23 Aug 2019 09:32:04 -0400 Subject: [PATCH 11/12] refactor: cheaper rand want selection --- session/sessionwants.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/session/sessionwants.go b/session/sessionwants.go index 26eed8b9..aa487f12 100644 --- a/session/sessionwants.go +++ b/session/sessionwants.go @@ -1,7 +1,6 @@ package session import ( - "math" "math/rand" "sync" "time" @@ -134,7 +133,7 @@ func (sw *sessionWants) LiveWants() []cid.Cid { // RandomLiveWant returns a randomly selected live want func (sw *sessionWants) RandomLiveWant() cid.Cid { - r := rand.Float64() + i := rand.Uint64() sw.RLock() defer sw.RUnlock() @@ -142,7 +141,7 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid { if len(sw.liveWants) == 0 { return cid.Cid{} } - i := math.Floor(r * float64(len(sw.liveWants))) + i %= uint64(len(sw.liveWants)) // picking a random live want for k := range sw.liveWants { if i == 0 { From 312b40bae0b61bda59184475212f3ac4904079c8 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 23 Aug 2019 09:34:04 -0400 Subject: [PATCH 12/12] refactor: remove unused code --- sessionmanager/sessionmanager_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index dfd3446c..95c12b12 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -89,24 +89,6 @@ func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter { return &fakeRequestSplitter{} } -func cmpSessionCids(s *fakeSession, cids []cid.Cid) bool { - if len(s.ks) != len(cids) { - return false - } - for _, bk := range s.ks { - has := false - for _, c := range cids { - if c == bk { - has = true - } - } - if !has { - return false - } - } - return true -} - func TestAddingSessions(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx)