Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Feat: process response message blocks as a batch (#170)
Browse files Browse the repository at this point in the history
feat: process response message blocks as a batch
  • Loading branch information
dirkmc authored Aug 13, 2019
1 parent ee347b8 commit e72b289
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 219 deletions.
139 changes: 92 additions & 47 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,39 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlockFrom(blk, "")
return bs.receiveBlocksFrom("", []blocks.Block{blk})
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

err := bs.blockstore.Put(blk)
wanted := blks

// If blocks came from the network
if from != "" {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.wm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
}
}

// Put wanted blocks into blockstore
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}

Expand All @@ -291,18 +307,25 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.

bs.sm.ReceiveBlockFrom(from, blk)
// Send all blocks (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveBlocksFrom(from, blks)

bs.engine.AddBlock(blk)
// Send wanted blocks to decision engine
bs.engine.AddBlocks(wanted)

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
for _, b := range wanted {
select {
case bs.newBlocks <- b.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

Expand All @@ -325,56 +348,78 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return
}

wg := sync.WaitGroup{}
for _, block := range iblocks {

wg.Add(1)
go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine...
defer wg.Done()

bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(p, b)
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
// skip received blocks that are not in the wantlist
if !bs.wm.IsWanted(b.Cid()) {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), p)
return
}

if err := bs.receiveBlockFrom(b, p); err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
}
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}(block)
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
wg.Wait()
}

func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
blkLen := len(b.RawData())
has, err := bs.blockstore.Has(b.Cid())
// Process blocks
err := bs.receiveBlocksFrom(p, iblocks)
if err != nil {
log.Infof("blockstore.Has error: %s", err)
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}

bs.allMetric.Observe(float64(blkLen))
if has {
bs.dupMetric.Observe(float64(blkLen))
for _, b := range iblocks {
if bs.wm.IsWanted(b.Cid()) {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
}

func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
// Check which blocks are in the datastore
// (Note: any errors from the blockstore are simply logged out in
// blockstoreHas())
blocksHas := bs.blockstoreHas(blocks)

bs.counterLk.Lock()
defer bs.counterLk.Unlock()
c := bs.counters

c.blocksRecvd++
c.dataRecvd += uint64(len(b.RawData()))
if has {
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
// Do some accounting for each block
for i, b := range blocks {
has := blocksHas[i]

blkLen := len(b.RawData())
bs.allMetric.Observe(float64(blkLen))
if has {
bs.dupMetric.Observe(float64(blkLen))
}

c := bs.counters

c.blocksRecvd++
c.dataRecvd += uint64(blkLen)
if has {
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
}
}
}

func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
res := make([]bool, len(blks))

wg := sync.WaitGroup{}
for i, block := range blks {
wg.Add(1)
go func(i int, b blocks.Block) {
defer wg.Done()

has, err := bs.blockstore.Has(b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
has = false
}

res[i] = has
}(i, block)
}
wg.Wait()

return res
}

// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
Expand Down
7 changes: 7 additions & 0 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,25 @@ func TestBasicBitswap(t *testing.T) {

instances := ig.Instances(3)
blocks := bg.Blocks(1)

// First peer has block
err := instances[0].Exchange.HasBlock(blocks[0])
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Second peer broadcasts want for block CID
// (Received by first and third peers)
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}

// When second peer receives block, it should send out a cancel, so third
// peer should no longer keep second peer's want
if err = tu.WaitFor(ctx, func() error {
if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
return fmt.Errorf("should have no items in other peers wantlist")
Expand Down
25 changes: 14 additions & 11 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,19 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
}
}

func (e *Engine) addBlock(block blocks.Block) {
func (e *Engine) addBlocks(blocks []blocks.Block) {
work := false

for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
Identifier: entry.Cid,
Priority: entry.Priority,
})
work = true
for _, block := range blocks {
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
Identifier: entry.Cid,
Priority: entry.Priority,
})
work = true
}
}
l.lk.Unlock()
}
Expand All @@ -332,13 +334,14 @@ func (e *Engine) addBlock(block blocks.Block) {
}
}

// AddBlock is called to when a new block is received and added to a block store
// meaning there may be peers who want that block that we should send it to.
func (e *Engine) AddBlock(block blocks.Block) {
// AddBlocks is called when new blocks are received and added to a block store,
// meaning there may be peers who want those blocks, so we should send the blocks
// to them.
func (e *Engine) AddBlocks(blocks []blocks.Block) {
e.lock.Lock()
defer e.lock.Unlock()

e.addBlock(block)
e.addBlocks(blocks)
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
Expand Down
Loading

0 comments on commit e72b289

Please sign in to comment.