Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Control provider workers with experiment flag
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelavila committed Apr 30, 2019
1 parent 401b87d commit 5543aed
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
14 changes: 9 additions & 5 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
)

var (
ProvideEnabled = true

HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
Expand Down Expand Up @@ -258,11 +260,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {

bs.engine.AddBlock(blk)

select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
if ProvideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

bssession "github.com/ipfs/go-bitswap/session"
decision "github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/message"
tn "github.com/ipfs/go-bitswap/testnet"
Expand Down Expand Up @@ -99,6 +100,42 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
ProvideEnabled = false
defer func() { ProvideEnabled = true }()

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
g := NewTestSessionGenerator(net)
defer g.Close()

hasBlock := g.Next()
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

wantsBlock := g.Next()
defer wantsBlock.Exchange.Close()

ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)
// set find providers delay to less than timeout context of this test
ns.SetBaseTickDelay(10 * time.Millisecond)

received, err := ns.GetBlock(ctx, block.Cid())
if received != nil {
t.Fatalf("Expected to find nothing, found %s", received)
}

if err != context.DeadlineExceeded {
t.Fatal("Expected deadline exceeded")
}
}

func TestUnwantedBlockNotAdded(t *testing.T) {

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
Expand Down
20 changes: 11 additions & 9 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
})
}

// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
if ProvideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}

func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
Expand Down

0 comments on commit 5543aed

Please sign in to comment.