From 6d5cc743431781518909727dfb2b4168f8837d75 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 29 Jun 2023 14:56:15 +1000 Subject: [PATCH] fix: various traversal and verifier fixes Closes: https://github.com/filecoin-project/lassie/issues/336 * don't treat context cancellation as an ErrExtraneousBlock in the CAR verifier * capture and properly handle block load errors that are missed by the go-ipld-prime traverser, ref: https://github.com/ipld/go-ipld-prime/pull/524 * fix flaky case(s) in verifiedcar test suite where multi-level sharded directory is assumed but only a single block dir is produced --- pkg/internal/testutil/gen.go | 37 +++++- pkg/retriever/bitswapretriever.go | 35 +++++- pkg/retriever/httpretriever.go | 2 +- pkg/storage/duplicateaddercar.go | 3 + pkg/verifiedcar/verifiedcar.go | 131 ++++++++++++-------- pkg/verifiedcar/verifiedcar_test.go | 177 ++++++++++++++++------------ 6 files changed, 258 insertions(+), 127 deletions(-) diff --git a/pkg/internal/testutil/gen.go b/pkg/internal/testutil/gen.go index a3f83940..9e5725f8 100644 --- a/pkg/internal/testutil/gen.go +++ b/pkg/internal/testutil/gen.go @@ -1,6 +1,8 @@ package testutil import ( + "fmt" + "io" "math/rand" "net" "strconv" @@ -10,7 +12,10 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/ipfs/go-unixfsnode/data" unixfs "github.com/ipfs/go-unixfsnode/testutil" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipni/go-libipni/metadata" crypto "github.com/libp2p/go-libp2p/core/crypto" @@ -150,9 +155,11 @@ func (ZeroReader) Read(b []byte) (n int, err error) { return len(b), nil } -// TODO: this should probably be an option in unixfsnode/testutil, for -// generators to strictly not return a DAG with duplicates +// TODO: these should probably be in unixfsnode/testutil, or as options to +// the respective functions there. +// GenerateNoDupes runs the unixfsnode/testutil generator function repeatedly +// until it produces a DAG with strictly no duplicate CIDs. func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry { var check func(unixfs.DirEntry) bool var seen map[cid.Cid]struct{} @@ -178,3 +185,29 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry { } } } + +// GenerateStrictlyNestedShardedDir is a wrapper around +// unixfsnode/testutil.GenerateDirectory that will repeatedly run it until it +// produces a sharded directory structure with strictly at least one level of +// sharding. Since it is possible to produce a sharded directory that is +// contained in a single block, this function provides a way to generate a +// sharded directory for cases where we need to test multi-level sharding. +func GenerateStrictlyNestedShardedDir(t *testing.T, linkSys *linking.LinkSystem, randReader io.Reader, targetSize int) unixfs.DirEntry { + for { + de := unixfs.GenerateDirectory(t, linkSys, randReader, targetSize, true) + nd, err := linkSys.Load(linking.LinkContext{}, cidlink.Link{Cid: de.Root}, dagpb.Type.PBNode) + require.NoError(t, err) + ufsd, err := data.DecodeUnixFSData(nd.(dagpb.PBNode).Data.Must().Bytes()) + require.NoError(t, err) + pfxLen := len(fmt.Sprintf("%X", ufsd.FieldFanout().Must().Int()-1)) + iter := nd.(dagpb.PBNode).Links.ListIterator() + for !iter.Done() { + _, lnk, err := iter.Next() + require.NoError(t, err) + nameLen := len(lnk.(dagpb.PBLink).Name.Must().String()) + if nameLen == pfxLen { // name is just a shard prefix, so we have at least one level of nesting + return de + } + } + } +} diff --git a/pkg/retriever/bitswapretriever.go b/pkg/retriever/bitswapretriever.go index ab19b8b9..f7224dba 100644 --- a/pkg/retriever/bitswapretriever.go +++ b/pkg/retriever/bitswapretriever.go @@ -229,7 +229,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb ctx, cidlink.Link{Cid: br.request.Cid}, selector, - &traversalLinkSys, + traversalLinkSys, preloader, br.request.MaxBlocks, ) @@ -295,15 +295,20 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi } } +func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { + return nil +} + func easyTraverse( ctx context.Context, root datamodel.Link, traverseSelector datamodel.Node, - lsys *linking.LinkSystem, + lsys linking.LinkSystem, preloader preload.Loader, maxBlocks uint64, ) error { + lsys, ecr := newErrorCapturingReader(lsys) protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser) // retrieve first node @@ -319,7 +324,7 @@ func easyTraverse( progress := traversal.Progress{ Cfg: &traversal.Config{ Ctx: ctx, - LinkSystem: *lsys, + LinkSystem: lsys, LinkTargetNodePrototypeChooser: protoChooser, Preloader: preloader, }, @@ -335,5 +340,27 @@ func easyTraverse( if err != nil { return err } - return progress.WalkAdv(node, compiledSelector, func(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { return nil }) + if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil { + return err + } + return ecr.Error +} + +type errorCapturingReader struct { + sro linking.BlockReadOpener + Error error +} + +func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) { + ecr := &errorCapturingReader{sro: lsys.StorageReadOpener} + lsys.StorageReadOpener = ecr.StorageReadOpener + return lsys, ecr +} + +func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + r, err := ecr.sro(lc, l) + if err != nil { + ecr.Error = err + } + return r, err } diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index 3cd046e2..2ef1e893 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -198,7 +198,7 @@ func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader { } } -func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) { +func (t *timeToFirstByteReader) Read(p []byte) (int, error) { if !t.first { t.first = true defer t.cb() diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index f30e1880..df11ef17 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -131,6 +131,9 @@ func (bs *blockStream) Close() { } func (bs *blockStream) WriteBlock(blk blocks.Block) error { + if bs.ctx.Err() != nil { + return bs.ctx.Err() + } bs.mu.Lock() defer bs.mu.Unlock() if bs.done { diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index cec78ddf..5772d970 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -69,7 +69,6 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // // * https://specs.ipfs.tech/http-gateways/path-gateway/ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { - cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) if err != nil { // TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err) @@ -93,7 +92,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin } func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) { - sel, err := selector.CompileSelector(cfg.Selector) if err != nil { return 0, 0, err @@ -106,7 +104,8 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys) + nbro := NewNextBlockReadOpener(ctx, cfg, cr, bt, lsys) + lsys.StorageReadOpener = nbro.StorageReadOpener // run traversal in this goroutine progress := traversal.Progress{ @@ -136,9 +135,15 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l return 0, 0, traversalError(err) } + if nbro.Error != nil { + // capture any errors not bubbled up through the traversal, i.e. see + // https://github.com/ipld/go-ipld-prime/pull/524 + return 0, 0, nbro.Error + } + // make sure we don't have any extraneous data beyond what the traversal needs _, err = cbr.Next() - if !errors.Is(err, io.EOF) { + if ctx.Err() == nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { return 0, 0, ErrExtraneousBlock } @@ -146,58 +151,92 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l return bt.blocks, bt.bytes, nil } -func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener { - seen := make(map[cid.Cid]struct{}) - return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { - cid := l.(cidlink.Link).Cid - - var data []byte - var err error - if _, ok := seen[cid]; ok { - if cfg.ExpectDuplicatesIn { - // duplicate block, but in this case we are expecting the stream to have it - data, err = cr.readNextBlock(ctx, cid) - if err != nil { - return nil, err - } - if !cfg.WriteDuplicatesOut { - return bytes.NewReader(data), nil - } - } else { - // duplicate block, rely on the supplied LinkSystem to have stored this - rdr, err := lsys.StorageReadOpener(lc, l) - if !cfg.WriteDuplicatesOut { - return rdr, err - } - data, err = io.ReadAll(rdr) - if err != nil { - return nil, err - } +type NextBlockReadOpener struct { + cfg Config + ctx context.Context + cr *carReader + bt *writeTracker + lsys linking.LinkSystem + seen map[cid.Cid]struct{} + + Error error + Count int +} + +func NewNextBlockReadOpener( + ctx context.Context, + cfg Config, + cr *carReader, + bt *writeTracker, + lsys linking.LinkSystem, +) *NextBlockReadOpener { + return &NextBlockReadOpener{ + cfg: cfg, + ctx: ctx, + cr: cr, + bt: bt, + lsys: lsys, + seen: make(map[cid.Cid]struct{}), + } +} + +func (nbro *NextBlockReadOpener) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + cid := l.(cidlink.Link).Cid + nbro.Count++ + var data []byte + var err error + if _, ok := nbro.seen[cid]; ok { + if nbro.cfg.ExpectDuplicatesIn { + // duplicate block, but in this case we are expecting the stream to have it + data, err = nbro.cr.readNextBlock(nbro.ctx, cid) + if err != nil { + nbro.Error = err + return nil, err + } + if !nbro.cfg.WriteDuplicatesOut { + return bytes.NewReader(data), nil } } else { - seen[cid] = struct{}{} - data, err = cr.readNextBlock(ctx, cid) + // duplicate block, rely on the supplied LinkSystem to have stored this + rdr, err := nbro.lsys.StorageReadOpener(lc, l) + if !nbro.cfg.WriteDuplicatesOut { + nbro.Error = err + return rdr, err + } + data, err = io.ReadAll(rdr) if err != nil { + nbro.Error = err return nil, err } } - bt.recordBlock(data) - w, wc, err := lsys.StorageWriteOpener(lc) + } else { + nbro.seen[cid] = struct{}{} + data, err = nbro.cr.readNextBlock(nbro.ctx, cid) if err != nil { + nbro.Error = err return nil, err } - rdr := bytes.NewReader(data) - if _, err := io.Copy(w, rdr); err != nil { - return nil, err - } - if err := wc(l); err != nil { - return nil, err - } - if _, err := rdr.Seek(0, io.SeekStart); err != nil { - return nil, err - } - return io.NopCloser(rdr), nil } + nbro.bt.recordBlock(data) + w, wc, err := nbro.lsys.StorageWriteOpener(lc) + if err != nil { + nbro.Error = err + return nil, err + } + rdr := bytes.NewReader(data) + if _, err := io.Copy(w, rdr); err != nil { + nbro.Error = err + return nil, err + } + if err := wc(l); err != nil { + nbro.Error = err + return nil, err + } + if _, err := rdr.Seek(0, io.SeekStart); err != nil { + nbro.Error = err + return nil, err + } + return io.NopCloser(rdr), nil } type carReader struct { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 3dc5f06e..40698131 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -3,6 +3,7 @@ package verifiedcar_test import ( "bytes" "context" + "errors" "io" "math/rand" "os" @@ -74,7 +75,9 @@ func TestVerifiedCar(t *testing.T) { } } - unixfsShardedDir := testutil.GenerateNoDupes(func() unixfs.DirEntry { return unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, true) }) + unixfsShardedDir := testutil.GenerateNoDupes(func() unixfs.DirEntry { + return testutil.GenerateStrictlyNestedShardedDir(t, &lsys, rndReader, 8<<20) + }) unixfsShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, allSelector) unixfsPreloadSelector := unixfsnode.MatchUnixFSPreloadSelector.Node() @@ -115,7 +118,9 @@ func TestVerifiedCar(t *testing.T) { blocks []expectedBlock roots []cid.Cid carv2 bool - err string + expectErr string + streamErr error + blockWriteErr error cfg verifiedcar.Config incomingHasDups bool }{ @@ -129,11 +134,11 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv2 without AllowCARv2 errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{root1}, - carv2: true, - err: "bad CAR version", + name: "carv2 without AllowCARv2 errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + carv2: true, + expectErr: "bad CAR version", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -151,10 +156,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with multiple roots errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{root1, root1}, - err: "root CID mismatch", + name: "carv1 with multiple roots errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1, root1}, + expectErr: "root CID mismatch", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -171,10 +176,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with wrong root errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, - err: "root CID mismatch", + name: "carv1 with wrong root errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, + expectErr: "root CID mismatch", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -182,40 +187,40 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with extraneous trailing block errors", - blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), - roots: []cid.Cid{root1}, - err: "extraneous block in CAR", + name: "carv1 with extraneous trailing block errors", + blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), + roots: []cid.Cid{root1}, + expectErr: "extraneous block in CAR", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with extraneous leading block errors", - blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), - roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), + name: "carv1 with extraneous leading block errors", + blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), + roots: []cid.Cid{root1}, + expectErr: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with out-of-order blocks errors", - blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), - roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), + name: "carv1 with out-of-order blocks errors", + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), + roots: []cid.Cid{root1}, + expectErr: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with mismatching CID errors", - blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), - roots: []cid.Cid{root1}, - err: "mismatch in content integrity", + name: "carv1 with mismatching CID errors", + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), + roots: []cid.Cid{root1}, + expectErr: "mismatch in content integrity", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -225,7 +230,7 @@ func TestVerifiedCar(t *testing.T) { name: "carv1 over budget errors", blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, - err: (&traversal.ErrBudgetExceeded{ + expectErr: (&traversal.ErrBudgetExceeded{ BudgetKind: "link", Path: datamodel.ParsePath("Parents/0/Parents/0/Parents/0"), Link: tbc1.LinkTipIndex(3), @@ -273,26 +278,20 @@ func TestVerifiedCar(t *testing.T) { }, }, { - // TODO: this is flaky, why? - // Error "extraneous block in CAR" does not contain "unexpected block in CAR" - // it's always a directory.UnixFSBasicDir, we use preload match `.` which should - // only want the first block. unixfsDirBlocks is created from an allSelector - // traversal, why is unixfs-preload making a difference for just matching a - // directory.UnixFSBasicDir. - name: "unixfs: all of large directory with file scope, errors", - blocks: consumedBlocks(unixfsDirBlocks), - roots: []cid.Cid{unixfsDir.Root}, - err: "extraneous block in CAR", + name: "unixfs: all of large directory with file scope, errors", + blocks: consumedBlocks(unixfsDirBlocks), + roots: []cid.Cid{unixfsDir.Root}, + expectErr: "extraneous block in CAR", cfg: verifiedcar.Config{ Root: unixfsDir.Root, Selector: unixfsPreloadSelector, }, }, { - name: "unixfs: all of large sharded directory with file scope, errors", - blocks: consumedBlocks(unixfsShardedDirBlocks), - roots: []cid.Cid{unixfsShardedDir.Root}, - err: "extraneous block in CAR", + name: "unixfs: all of large sharded directory with file scope, errors", + blocks: consumedBlocks(unixfsShardedDirBlocks), + roots: []cid.Cid{unixfsShardedDir.Root}, + expectErr: "unexpected block in CAR:", cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, Selector: unixfsPreloadSelector, @@ -317,10 +316,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: pathed subset inside large directory with file scope, errors", - blocks: consumedBlocks(unixfsDirBlocks), - roots: []cid.Cid{unixfsDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: pathed subset inside large directory with file scope, errors", + blocks: consumedBlocks(unixfsDirBlocks), + roots: []cid.Cid{unixfsDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsDir.Root, Selector: unixfsDirSubsetSelector, @@ -337,10 +336,10 @@ func TestVerifiedCar(t *testing.T) { }, { // our wrapped file has additional in the nested directories - name: "unixfs: large sharded file wrapped in directories, pathed, errors", - blocks: consumedBlocks(unixfsWrappedFileBlocks), - roots: []cid.Cid{unixfsWrappedFile.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded file wrapped in directories, pathed, errors", + blocks: consumedBlocks(unixfsWrappedFileBlocks), + roots: []cid.Cid{unixfsWrappedFile.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, Selector: unixfsWrappedPathSelector, @@ -356,10 +355,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", - blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), - roots: []cid.Cid{unixfsWrappedFile.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), + roots: []cid.Cid{unixfsWrappedFile.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, Selector: allSelector, @@ -385,10 +384,10 @@ func TestVerifiedCar(t *testing.T) { }, { // our wrapped dir has additional in the nested directories - name: "unixfs: large sharded dir wrapped in directories, pathed, errors", - blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), - roots: []cid.Cid{unixfsWrappedShardedDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded dir wrapped in directories, pathed, errors", + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), + roots: []cid.Cid{unixfsWrappedShardedDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, Selector: unixfsWrappedPathSelector, @@ -413,10 +412,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", - blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), - roots: []cid.Cid{unixfsWrappedShardedDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), + roots: []cid.Cid{unixfsWrappedShardedDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, Selector: allSelector, @@ -450,10 +449,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: file with dups, incoming has dups, not allowed", - blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), - err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), - roots: []cid.Cid{unixfsFileWithDups.Root}, + name: "unixfs: file with dups, incoming has dups, not allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + expectErr: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), + roots: []cid.Cid{unixfsFileWithDups.Root}, cfg: verifiedcar.Config{ Root: unixfsFileWithDups.Root, Selector: allSelector, @@ -493,6 +492,28 @@ func TestVerifiedCar(t *testing.T) { }, incomingHasDups: true, }, + { + name: "premature stream end errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + expectErr: "something wicked this way comes", + streamErr: errors.New("something wicked this way comes"), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + }, + }, + { + name: "block write error errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + expectErr: "something wicked this way comes", + blockWriteErr: errors.New("something wicked this way comes"), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + }, + }, } for _, testCase := range testCases { @@ -517,6 +538,9 @@ func TestVerifiedCar(t *testing.T) { lsys.StorageWriteOpener = func(lc linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { var buf bytes.Buffer return &buf, func(l datamodel.Link) error { + if testCase.blockWriteErr != nil && writeCounter+skipped == len(testCase.blocks)/2 { + return testCase.blockWriteErr + } for testCase.blocks[writeCounter+skipped].skipped { skipped++ } @@ -532,14 +556,14 @@ func TestVerifiedCar(t *testing.T) { }, nil } - carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "", testCase.incomingHasDups) + carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.expectErr != "", testCase.incomingHasDups, testCase.streamErr) blockCount, byteCount, err := testCase.cfg.VerifyCar(ctx, carStream, lsys) // read the rest of data io.ReadAll(carStream) - if testCase.err != "" { - req.ErrorContains(err, testCase.err) + if testCase.expectErr != "" { + req.ErrorContains(err, testCase.expectErr) req.Equal(uint64(0), blockCount) req.Equal(uint64(0), byteCount) } else { @@ -560,6 +584,7 @@ func makeCarStream( carv2 bool, expectErrors bool, allowDuplicatePuts bool, + streamError error, ) io.Reader { r, w := io.Pipe() @@ -588,7 +613,11 @@ func makeCarStream( if err != nil { return } - for _, block := range blocks { + for ii, block := range blocks { + if streamError != nil && ii == len(blocks)/2 { + w.CloseWithError(streamError) + return + } err := carWriter.Put(ctx, block.Cid().KeyString(), block.RawData()) if !expectErrors { req.NoError(err)