Skip to content

Commit

Permalink
Add support for multiple challenges per level
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-pl committed Nov 8, 2023
1 parent 99bb7f7 commit 2d4e263
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ run:
- extern
- integration/repdao
- integration/repdao_dp
skip-files:
- pkg/net/host.go # this is failing for me because libp2p cannot be found on go1.20.1

linters:
enable-all: true
Expand Down
1 change: 1 addition & 0 deletions pkg/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
Longitude Key = "_LONGITUDE"
LotusAPIToken Key = "LOTUS_API_TOKEN"
LotusAPIUrl Key = "LOTUS_API_URL"
NumberOfChallenges Key = "NUMBER_OF_CHALLENGES"
ProcessErrorInterval Key = "PROCESS_ERROR_INTERVAL"
ProcessModules Key = "PROCESS_MODULES"
ProviderCacheTTL Key = "PROVIDER_CACHE_TTL"
Expand Down
110 changes: 74 additions & 36 deletions pkg/net/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package net

import (
"context"
"math"
"math/rand"
"time"

"github.com/data-preservation-programs/RetrievalBot/pkg/env"
"github.com/data-preservation-programs/RetrievalBot/pkg/task"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/slices"

_ "github.com/ipld/go-codec-dagpb"
Expand Down Expand Up @@ -173,7 +176,10 @@ func (c BitswapClient) SpadeTraversal(parent context.Context,
startingCid cid.Cid,
maxTraverseDepth uint) (*task.RetrievalResult, error) {
logger := logging.Logger("bitswap_client_spade").With("cid", startingCid).With("target", target)
cidToRetrieve := startingCid

numChallenge := validateAndGetNumberOfChallenges(logger)
cidsToRetrieve := initializeCidsToRetrieve(numChallenge, startingCid)
nextIndexes := make([]datamodel.Link, 0, int(math.Pow(2, float64(maxTraverseDepth))))

// Initialize hosts and clients required to do all the retrieval tests
network := bsnet.NewFromIpfsHost(c.host, SingleContentRouter{
Expand All @@ -187,58 +193,90 @@ func (c BitswapClient) SpadeTraversal(parent context.Context,
startTime := time.Now()

i := uint(0)
var blk blocks.Block
for {
// Retrieval
logger.Infof("retrieving %s\n", cidToRetrieve.String())
blk, err := c.RetrieveBlock(parent, target, network, bswap, cidToRetrieve)

if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.RetrievalFailure, err), nil
}

// Verify returned content hashes to the CID we're expecting
if !blk.Cid().Equals(cidToRetrieve) {
return task.NewErrorRetrievalResult(task.CIDMismatch,
errors.Errorf("retrieved cid does not match requested: %s, %s",
blk.Cid().String(), cidToRetrieve)), nil
for j, cidToRetrieve := range cidsToRetrieve {
// Retrieval
logger.Infof("retrieving %s\n", cidToRetrieve.String())
blk, err := c.RetrieveBlock(parent, target, network, bswap, cidToRetrieve)

if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.RetrievalFailure, err), nil
}

// Verify returned content hashes to the CID we're expecting
if !blk.Cid().Equals(cidToRetrieve) {
return task.NewErrorRetrievalResult(task.CIDMismatch,
errors.Errorf("retrieved cid does not match requested: %s, %s",
blk.Cid().String(), cidToRetrieve)), nil
}

// Wait until we are at max depth and tried all the challenges
if i == maxTraverseDepth && j == len(cidsToRetrieve)-1 {
var size = int64(len(blk.RawData()))
elapsed := time.Since(startTime)
logger.With("size", size).With("elapsed", elapsed).Info("Retrieved block")

// we've reached the requested depth of the tree
return task.NewSuccessfulRetrievalResult(elapsed, size, elapsed), nil
}

// if not at bottom of the tree, keep going down the links until we reach it or hit a dead end
links, err := FindLinks(parent, blk)
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CannotDecodeLinks, err), nil
}

logger.Debugf("cid %s has %d links\n", cidToRetrieve.String(), len(links))

nextIndexes = append(nextIndexes, links...)
}

if i == maxTraverseDepth {
if len(nextIndexes) == 0 {
var size = int64(len(blk.RawData()))
elapsed := time.Since(startTime)
logger.With("size", size).With("elapsed", elapsed).Info("Retrieved block")

// we've reached the requested depth of the tree
return task.NewSuccessfulRetrievalResult(elapsed, size, elapsed), nil
}

// if not at bottom of the tree, keep going down the links until we reach it or hit a dead end
links, err := FindLinks(parent, blk)
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CannotDecodeLinks, err), nil
}
// Clear out cids list to prevent resizing array
cidsToRetrieve = cidsToRetrieve[:0]

logger.Debugf("cid %s has %d links\n", cidToRetrieve.String(), len(links))

if len(links) == 0 {
var size = int64(len(blk.RawData()))
elapsed := time.Since(startTime)
logger.With("size", size).With("elapsed", elapsed).Info("Retrieved block")
// Shuffle a slice so we can find the next cids to challenge
rand.Shuffle(len(nextIndexes), func(i, j int) {
nextIndexes[i], nextIndexes[j] = nextIndexes[j], nextIndexes[i]
})

return task.NewSuccessfulRetrievalResult(elapsed, size, elapsed), nil
for j := 0; j < int(math.Min(float64(numChallenge), float64(len(nextIndexes)))); j++ {
cid, err := cid.Parse(nextIndexes[j].String())
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CIDCodecNotSupported, err), nil
}
cidsToRetrieve = append(cidsToRetrieve, cid)
}

// randomly pick a link to go down
//nolint:all we don't need crypto secured random numbers
nextIndex := rand.Intn(len(links))
// Clear out nextIndexes list to help prevent resizing array
nextIndexes = nextIndexes[:0]

cidToRetrieve, err = cid.Parse(links[nextIndex].String())
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CIDCodecNotSupported, err), nil
}
// To the next layer of the tree
i++
}
}

func initializeCidsToRetrieve(numChallenge int, startingCid cid.Cid) []cid.Cid {
cidsToRetrieve := make([]cid.Cid, 0, numChallenge)
cidsToRetrieve = append(cidsToRetrieve, startingCid)
return cidsToRetrieve
}

i++ // To the next layer of the tree
func validateAndGetNumberOfChallenges(logger *zap.SugaredLogger) int {
var numChallenge = env.GetRequiredInt(env.NumberOfChallenges)
if numChallenge <= 0 || numChallenge > 10 {
numChallenge = 1
logger.Infof("NumberOfChallenges is <= 0 or NumberOfChallenges > 10. Using 1 challenge per level")
}
return numChallenge
}

// Returns the raw block data, the links, and error if any
Expand Down
26 changes: 26 additions & 0 deletions pkg/net/bitswap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package net

import (
"testing"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/assert"
)

func TestInitializeCidsToRetrieve(t *testing.T) {
challenge4 := initializeCidsToRetrieve(4, cid.Cid{})

assert.Equal(t, 1, len(challenge4))
}

func TestValidateAndGetNumberOfChallenges(t *testing.T) {
logger := logging.Logger("test").With("test", "test")
t.Setenv("NUMBER_OF_CHALLENGES", "5")
var challenges = validateAndGetNumberOfChallenges(logger)

assert.Equal(t, 5, challenges)
t.Setenv("NUMBER_OF_CHALLENGES", "50")
challenges = validateAndGetNumberOfChallenges(logger)
assert.Equal(t, 1, challenges)
}
1 change: 1 addition & 0 deletions pkg/net/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package net

import (
"context"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down

0 comments on commit 2d4e263

Please sign in to comment.