From 5543aed3620f3a5bfdfdc314ec985bfbda50c4ab Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 20 Nov 2018 12:59:52 -0800 Subject: [PATCH] Control provider workers with experiment flag --- bitswap.go | 14 +++++++++----- bitswap_test.go | 37 +++++++++++++++++++++++++++++++++++++ workers.go | 20 +++++++++++--------- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/bitswap.go b/bitswap.go index 7e63d936..9a2a1281 100644 --- a/bitswap.go +++ b/bitswap.go @@ -51,6 +51,8 @@ const ( ) var ( + ProvideEnabled = true + HasBlockBufferSize = 256 provideKeysBufferSize = 2048 provideWorkerMax = 6 @@ -258,11 +260,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { bs.engine.AddBlock(blk) - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() + if ProvideEnabled { + select { + case bs.newBlocks <- blk.Cid(): + // send block off to be reprovided + case <-bs.process.Closing(): + return bs.process.Close() + } } return nil } diff --git a/bitswap_test.go b/bitswap_test.go index bbd1b349..746b8c7c 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + bssession "github.com/ipfs/go-bitswap/session" decision "github.com/ipfs/go-bitswap/decision" "github.com/ipfs/go-bitswap/message" tn "github.com/ipfs/go-bitswap/testnet" @@ -99,6 +100,42 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } } +func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { + ProvideEnabled = false + defer func() { ProvideEnabled = true }() + + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + block := blocks.NewBlock([]byte("block")) + g := NewTestSessionGenerator(net) + defer g.Close() + + hasBlock := g.Next() + defer hasBlock.Exchange.Close() + + if err := hasBlock.Exchange.HasBlock(block); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + wantsBlock := g.Next() + defer wantsBlock.Exchange.Close() + + ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session) + // set find providers delay to less than timeout context of this test + ns.SetBaseTickDelay(10 * time.Millisecond) + + received, err := ns.GetBlock(ctx, block.Cid()) + if received != nil { + t.Fatalf("Expected to find nothing, found %s", received) + } + + if err != context.DeadlineExceeded { + t.Fatal("Expected deadline exceeded") + } +} + func TestUnwantedBlockNotAdded(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) diff --git a/workers.go b/workers.go index 45f78615..6e0bf037 100644 --- a/workers.go +++ b/workers.go @@ -23,15 +23,17 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { }) } - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) + if ProvideEnabled { + // Start up a worker to manage sending out provides messages + px.Go(func(px process.Process) { + bs.provideCollector(ctx) + }) + + // Spawn up multiple workers to handle incoming blocks + // consider increasing number if providing blocks bottlenecks + // file transfers + px.Go(bs.provideWorker) + } } func (bs *Bitswap) taskWorker(ctx context.Context, id int) {