diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index 5034aaee..0be7bc97 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -152,6 +152,47 @@ func TestSessionSplitFetch(t *testing.T) { } } +func TestFetchNotConnected(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + bssession.SetProviderSearchDelay(10 * time.Millisecond) + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + other := sesgen.Next() + + blks := bgen.Blocks(10) + for _, block := range blks { + if err := other.Exchange.HasBlock(block); err != nil { + t.Fatal(err) + } + } + + var cids []cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + thisNode := sesgen.Next() + ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session) + ses.SetBaseTickDelay(time.Millisecond * 10) + + ch, err := ses.GetBlocks(ctx, cids) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks); err != nil { + t.Fatal(err) + } +} func TestInterestCacheOverflow(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/session/session.go b/session/session.go index bae52bd0..b57f472e 100644 --- a/session/session.go +++ b/session/session.go @@ -222,7 +222,12 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { } } -const provSearchDelay = time.Second * 10 +var provSearchDelay = time.Second + +// SetProviderSearchDelay overwrites the global provider search delay +func SetProviderSearchDelay(newProvSearchDelay time.Duration) { + provSearchDelay = newProvSearchDelay +} // Session run loop -- everything function below here should not be called // of this loop diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index 3b951c42..2e733832 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -5,11 +5,15 @@ import ( "fmt" "math/rand" + logging "github.com/ipfs/go-log" + cid "github.com/ipfs/go-cid" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" peer "github.com/libp2p/go-libp2p-peer" ) +var log = logging.Logger("bitswap") + const ( maxOptimizedPeers = 32 reservePeers = 2 @@ -18,6 +22,7 @@ const ( // PeerNetwork is an interface for finding providers and managing connections type PeerNetwork interface { ConnectionManager() ifconnmgr.ConnManager + ConnectTo(context.Context, peer.ID) error FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID } @@ -102,7 +107,13 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { // - ensure two 'findprovs' calls for the same block don't run concurrently // - share peers between sessions based on interest set for p := range spm.network.FindProvidersAsync(ctx, k, 10) { - spm.peerMessages <- &peerFoundMessage{p} + go func(p peer.ID) { + err := spm.network.ConnectTo(ctx, p) + if err != nil { + log.Debugf("failed to connect to provider %s: %s", p, err) + } + spm.peerMessages <- &peerFoundMessage{p} + }(p) } }(c) } diff --git a/sessionpeermanager/sessionpeermanager_test.go b/sessionpeermanager/sessionpeermanager_test.go index ba23c87d..b4e723b1 100644 --- a/sessionpeermanager/sessionpeermanager_test.go +++ b/sessionpeermanager/sessionpeermanager_test.go @@ -2,8 +2,8 @@ package sessionpeermanager import ( "context" - "sync" "math/rand" + "sync" "testing" "time" @@ -24,6 +24,10 @@ func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager { return fpn.connManager } +func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error { + return nil +} + func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID { peerCh := make(chan peer.ID) go func() {