Skip to content

Commit

Permalink
fix: various traversal and verifier fixes
Browse files Browse the repository at this point in the history
Closes: #336

* don't treat context cancellation as an ErrExtraneousBlock in the CAR verifier
* capture and properly handle block load errors that are missed by the
  go-ipld-prime traverser, ref: ipld/go-ipld-prime#524
* fix flaky case(s) in verifiedcar test suite where multi-level sharded
  directory is assumed but only a single block dir is produced
  • Loading branch information
rvagg committed Jun 29, 2023
1 parent 1d22d1f commit 6d5cc74
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 127 deletions.
37 changes: 35 additions & 2 deletions pkg/internal/testutil/gen.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package testutil

import (
"fmt"
"io"
"math/rand"
"net"
"strconv"
Expand All @@ -10,7 +12,10 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/ipfs/go-unixfsnode/data"
unixfs "github.com/ipfs/go-unixfsnode/testutil"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/go-libipni/metadata"
crypto "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -150,9 +155,11 @@ func (ZeroReader) Read(b []byte) (n int, err error) {
return len(b), nil
}

// TODO: this should probably be an option in unixfsnode/testutil, for
// generators to strictly not return a DAG with duplicates
// TODO: these should probably be in unixfsnode/testutil, or as options to
// the respective functions there.

// GenerateNoDupes runs the unixfsnode/testutil generator function repeatedly
// until it produces a DAG with strictly no duplicate CIDs.
func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
var check func(unixfs.DirEntry) bool
var seen map[cid.Cid]struct{}
Expand All @@ -178,3 +185,29 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

// GenerateStrictlyNestedShardedDir is a wrapper around
// unixfsnode/testutil.GenerateDirectory that will repeatedly run it until it
// produces a sharded directory structure with strictly at least one level of
// sharding. Since it is possible to produce a sharded directory that is
// contained in a single block, this function provides a way to generate a
// sharded directory for cases where we need to test multi-level sharding.
func GenerateStrictlyNestedShardedDir(t *testing.T, linkSys *linking.LinkSystem, randReader io.Reader, targetSize int) unixfs.DirEntry {
for {
de := unixfs.GenerateDirectory(t, linkSys, randReader, targetSize, true)
nd, err := linkSys.Load(linking.LinkContext{}, cidlink.Link{Cid: de.Root}, dagpb.Type.PBNode)
require.NoError(t, err)
ufsd, err := data.DecodeUnixFSData(nd.(dagpb.PBNode).Data.Must().Bytes())
require.NoError(t, err)
pfxLen := len(fmt.Sprintf("%X", ufsd.FieldFanout().Must().Int()-1))
iter := nd.(dagpb.PBNode).Links.ListIterator()
for !iter.Done() {
_, lnk, err := iter.Next()
require.NoError(t, err)
nameLen := len(lnk.(dagpb.PBLink).Name.Must().String())
if nameLen == pfxLen { // name is just a shard prefix, so we have at least one level of nesting
return de
}
}
}
}
35 changes: 31 additions & 4 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
ctx,
cidlink.Link{Cid: br.request.Cid},
selector,
&traversalLinkSys,
traversalLinkSys,
preloader,
br.request.MaxBlocks,
)
Expand Down Expand Up @@ -295,15 +295,20 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi
}
}

func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error {
return nil
}

func easyTraverse(
ctx context.Context,
root datamodel.Link,
traverseSelector datamodel.Node,
lsys *linking.LinkSystem,
lsys linking.LinkSystem,
preloader preload.Loader,
maxBlocks uint64,
) error {

lsys, ecr := newErrorCapturingReader(lsys)
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)

// retrieve first node
Expand All @@ -319,7 +324,7 @@ func easyTraverse(
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: *lsys,
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: protoChooser,
Preloader: preloader,
},
Expand All @@ -335,5 +340,27 @@ func easyTraverse(
if err != nil {
return err
}
return progress.WalkAdv(node, compiledSelector, func(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { return nil })
if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil {
return err
}
return ecr.Error
}

type errorCapturingReader struct {
sro linking.BlockReadOpener
Error error
}

func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) {
ecr := &errorCapturingReader{sro: lsys.StorageReadOpener}
lsys.StorageReadOpener = ecr.StorageReadOpener
return lsys, ecr
}

func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
r, err := ecr.sro(lc, l)
if err != nil {
ecr.Error = err
}
return r, err
}
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader {
}
}

func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) {
func (t *timeToFirstByteReader) Read(p []byte) (int, error) {
if !t.first {
t.first = true
defer t.cb()
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/duplicateaddercar.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (bs *blockStream) Close() {
}

func (bs *blockStream) WriteBlock(blk blocks.Block) error {
if bs.ctx.Err() != nil {
return bs.ctx.Err()
}
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.done {
Expand Down
131 changes: 85 additions & 46 deletions pkg/verifiedcar/verifiedcar.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason)
//
// * https://specs.ipfs.tech/http-gateways/path-gateway/
func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) {

cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false))
if err != nil {
// TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err)
Expand All @@ -93,7 +92,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin
}

func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) {

sel, err := selector.CompileSelector(cfg.Selector)
if err != nil {
return 0, 0, err
Expand All @@ -106,7 +104,8 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)

lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys)
nbro := NewNextBlockReadOpener(ctx, cfg, cr, bt, lsys)
lsys.StorageReadOpener = nbro.StorageReadOpener

// run traversal in this goroutine
progress := traversal.Progress{
Expand Down Expand Up @@ -136,68 +135,108 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
return 0, 0, traversalError(err)
}

if nbro.Error != nil {
// capture any errors not bubbled up through the traversal, i.e. see
// https://github.com/ipld/go-ipld-prime/pull/524
return 0, 0, nbro.Error
}

// make sure we don't have any extraneous data beyond what the traversal needs
_, err = cbr.Next()
if !errors.Is(err, io.EOF) {
if ctx.Err() == nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
return 0, 0, ErrExtraneousBlock
}

// wait for parser to finish and provide errors or stats
return bt.blocks, bt.bytes, nil
}

func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener {
seen := make(map[cid.Cid]struct{})
return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid

var data []byte
var err error
if _, ok := seen[cid]; ok {
if cfg.ExpectDuplicatesIn {
// duplicate block, but in this case we are expecting the stream to have it
data, err = cr.readNextBlock(ctx, cid)
if err != nil {
return nil, err
}
if !cfg.WriteDuplicatesOut {
return bytes.NewReader(data), nil
}
} else {
// duplicate block, rely on the supplied LinkSystem to have stored this
rdr, err := lsys.StorageReadOpener(lc, l)
if !cfg.WriteDuplicatesOut {
return rdr, err
}
data, err = io.ReadAll(rdr)
if err != nil {
return nil, err
}
type NextBlockReadOpener struct {
cfg Config
ctx context.Context
cr *carReader
bt *writeTracker
lsys linking.LinkSystem
seen map[cid.Cid]struct{}

Error error
Count int
}

func NewNextBlockReadOpener(
ctx context.Context,
cfg Config,
cr *carReader,
bt *writeTracker,
lsys linking.LinkSystem,
) *NextBlockReadOpener {
return &NextBlockReadOpener{
cfg: cfg,
ctx: ctx,
cr: cr,
bt: bt,
lsys: lsys,
seen: make(map[cid.Cid]struct{}),
}
}

func (nbro *NextBlockReadOpener) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid
nbro.Count++
var data []byte
var err error
if _, ok := nbro.seen[cid]; ok {
if nbro.cfg.ExpectDuplicatesIn {
// duplicate block, but in this case we are expecting the stream to have it
data, err = nbro.cr.readNextBlock(nbro.ctx, cid)
if err != nil {
nbro.Error = err
return nil, err
}
if !nbro.cfg.WriteDuplicatesOut {
return bytes.NewReader(data), nil
}
} else {
seen[cid] = struct{}{}
data, err = cr.readNextBlock(ctx, cid)
// duplicate block, rely on the supplied LinkSystem to have stored this
rdr, err := nbro.lsys.StorageReadOpener(lc, l)
if !nbro.cfg.WriteDuplicatesOut {
nbro.Error = err
return rdr, err
}
data, err = io.ReadAll(rdr)
if err != nil {
nbro.Error = err
return nil, err
}
}
bt.recordBlock(data)
w, wc, err := lsys.StorageWriteOpener(lc)
} else {
nbro.seen[cid] = struct{}{}
data, err = nbro.cr.readNextBlock(nbro.ctx, cid)
if err != nil {
nbro.Error = err
return nil, err
}
rdr := bytes.NewReader(data)
if _, err := io.Copy(w, rdr); err != nil {
return nil, err
}
if err := wc(l); err != nil {
return nil, err
}
if _, err := rdr.Seek(0, io.SeekStart); err != nil {
return nil, err
}
return io.NopCloser(rdr), nil
}
nbro.bt.recordBlock(data)
w, wc, err := nbro.lsys.StorageWriteOpener(lc)
if err != nil {
nbro.Error = err
return nil, err
}
rdr := bytes.NewReader(data)
if _, err := io.Copy(w, rdr); err != nil {
nbro.Error = err
return nil, err
}
if err := wc(l); err != nil {
nbro.Error = err
return nil, err
}
if _, err := rdr.Seek(0, io.SeekStart); err != nil {
nbro.Error = err
return nil, err
}
return io.NopCloser(rdr), nil
}

type carReader struct {
Expand Down
Loading

0 comments on commit 6d5cc74

Please sign in to comment.