From 59c58800a0c3a7dbd7b9e9280f421ca90823205f Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 29 Jun 2023 14:56:15 +1000 Subject: [PATCH] fix: various traversal and verifier fixes Closes: https://github.com/filecoin-project/lassie/issues/336 * don't treat context cancellation as an ErrExtraneousBlock in the CAR verifier * capture and properly handle block load errors that are missed by the go-ipld-prime traverser, ref: https://github.com/ipld/go-ipld-prime/pull/524 * fix flaky case(s) in verifiedcar test suite where multi-level sharded directory is assumed but only a single block dir is produced --- pkg/internal/testutil/gen.go | 45 ++++++- pkg/retriever/bitswapretriever.go | 35 +++++- pkg/retriever/httpretriever.go | 2 +- pkg/server/http/ipfs.go | 3 +- pkg/storage/duplicateaddercar.go | 3 + pkg/verifiedcar/verifiedcar.go | 46 ++++++-- pkg/verifiedcar/verifiedcar_test.go | 177 ++++++++++++++++------------ 7 files changed, 221 insertions(+), 90 deletions(-) diff --git a/pkg/internal/testutil/gen.go b/pkg/internal/testutil/gen.go index a3f83940..fca9093a 100644 --- a/pkg/internal/testutil/gen.go +++ b/pkg/internal/testutil/gen.go @@ -1,6 +1,8 @@ package testutil import ( + "fmt" + "io" "math/rand" "net" "strconv" @@ -10,7 +12,10 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/ipfs/go-unixfsnode/data" unixfs "github.com/ipfs/go-unixfsnode/testutil" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipni/go-libipni/metadata" crypto "github.com/libp2p/go-libp2p/core/crypto" @@ -150,9 +155,11 @@ func (ZeroReader) Read(b []byte) (n int, err error) { return len(b), nil } -// TODO: this should probably be an option in unixfsnode/testutil, for -// generators to strictly not return a DAG with duplicates +// TODO: these should probably be in unixfsnode/testutil, or as options to +// the respective functions there. +// GenerateNoDupes runs the unixfsnode/testutil generator function repeatedly +// until it produces a DAG with strictly no duplicate CIDs. func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry { var check func(unixfs.DirEntry) bool var seen map[cid.Cid]struct{} @@ -178,3 +185,37 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry { } } } + +// GenerateStrictlyNestedShardedDir is a wrapper around +// unixfsnode/testutil.GenerateDirectory that uses dark magic to repeatedly +// generate a sharded directory until it produces one that is strictly nested. +// That is, it produces a sharded directory structure with strictly at least one +// level of sharding with at least two child shards. +// +// Since it is possible to produce a sharded directory that is +// contained in a single block, this function provides a way to generate a +// sharded directory for cases where we need to test multi-level sharding. +func GenerateStrictlyNestedShardedDir(t *testing.T, linkSys *linking.LinkSystem, randReader io.Reader, targetSize int) unixfs.DirEntry { + for { + de := unixfs.GenerateDirectory(t, linkSys, randReader, targetSize, true) + nd, err := linkSys.Load(linking.LinkContext{}, cidlink.Link{Cid: de.Root}, dagpb.Type.PBNode) + require.NoError(t, err) + ufsd, err := data.DecodeUnixFSData(nd.(dagpb.PBNode).Data.Must().Bytes()) + require.NoError(t, err) + pfxLen := len(fmt.Sprintf("%X", ufsd.FieldFanout().Must().Int()-1)) + iter := nd.(dagpb.PBNode).Links.ListIterator() + childShards := 0 + for !iter.Done() { + _, lnk, err := iter.Next() + require.NoError(t, err) + nameLen := len(lnk.(dagpb.PBLink).Name.Must().String()) + if nameLen == pfxLen { + // name is just a shard prefix, so we have at least one level of nesting + childShards++ + } + } + if childShards >= 2 { + return de + } + } +} diff --git a/pkg/retriever/bitswapretriever.go b/pkg/retriever/bitswapretriever.go index ab19b8b9..f7224dba 100644 --- a/pkg/retriever/bitswapretriever.go +++ b/pkg/retriever/bitswapretriever.go @@ -229,7 +229,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb ctx, cidlink.Link{Cid: br.request.Cid}, selector, - &traversalLinkSys, + traversalLinkSys, preloader, br.request.MaxBlocks, ) @@ -295,15 +295,20 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi } } +func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { + return nil +} + func easyTraverse( ctx context.Context, root datamodel.Link, traverseSelector datamodel.Node, - lsys *linking.LinkSystem, + lsys linking.LinkSystem, preloader preload.Loader, maxBlocks uint64, ) error { + lsys, ecr := newErrorCapturingReader(lsys) protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser) // retrieve first node @@ -319,7 +324,7 @@ func easyTraverse( progress := traversal.Progress{ Cfg: &traversal.Config{ Ctx: ctx, - LinkSystem: *lsys, + LinkSystem: lsys, LinkTargetNodePrototypeChooser: protoChooser, Preloader: preloader, }, @@ -335,5 +340,27 @@ func easyTraverse( if err != nil { return err } - return progress.WalkAdv(node, compiledSelector, func(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { return nil }) + if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil { + return err + } + return ecr.Error +} + +type errorCapturingReader struct { + sro linking.BlockReadOpener + Error error +} + +func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) { + ecr := &errorCapturingReader{sro: lsys.StorageReadOpener} + lsys.StorageReadOpener = ecr.StorageReadOpener + return lsys, ecr +} + +func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + r, err := ecr.sro(lc, l) + if err != nil { + ecr.Error = err + } + return r, err } diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index 3cd046e2..2ef1e893 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -198,7 +198,7 @@ func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader { } } -func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) { +func (t *timeToFirstByteReader) Read(p []byte) (int, error) { if !t.first { t.first = true defer t.cb() diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index f11a6841..f805c6a1 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -1,6 +1,7 @@ package httpserver import ( + "context" "errors" "fmt" "net/http" @@ -188,7 +189,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req)) // force all blocks to flush - if cerr := carWriter.Close(); cerr != nil { + if cerr := carWriter.Close(); cerr != nil && !errors.Is(cerr, context.Canceled) { logger.Infof("error closing car writer: %s", cerr) } diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index f30e1880..df11ef17 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -131,6 +131,9 @@ func (bs *blockStream) Close() { } func (bs *blockStream) WriteBlock(blk blocks.Block) error { + if bs.ctx.Err() != nil { + return bs.ctx.Err() + } bs.mu.Lock() defer bs.mu.Unlock() if bs.done { diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index cec78ddf..03a4c4e6 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -69,7 +69,6 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // // * https://specs.ipfs.tech/http-gateways/path-gateway/ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { - cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) if err != nil { // TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err) @@ -93,7 +92,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin } func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) { - sel, err := selector.CompileSelector(cfg.Selector) if err != nil { return 0, 0, err @@ -106,7 +104,7 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys) + nbls, lsys := NewNextBlockLinkSystem(ctx, cfg, cr, bt, lsys) // run traversal in this goroutine progress := traversal.Progress{ @@ -136,21 +134,41 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l return 0, 0, traversalError(err) } + if nbls.Error != nil { + // capture any errors not bubbled up through the traversal, i.e. see + // https://github.com/ipld/go-ipld-prime/pull/524 + return 0, 0, nbls.Error + } + // make sure we don't have any extraneous data beyond what the traversal needs _, err = cbr.Next() - if !errors.Is(err, io.EOF) { + if err == nil { return 0, 0, ErrExtraneousBlock + } else if !errors.Is(err, io.EOF) { + return 0, 0, err } // wait for parser to finish and provide errors or stats return bt.blocks, bt.bytes, nil } -func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener { +type NextBlockLinkSystem struct { + Error error +} + +func NewNextBlockLinkSystem( + ctx context.Context, + cfg Config, + cr *carReader, + bt *writeTracker, + lsys linking.LinkSystem, +) (*NextBlockLinkSystem, linking.LinkSystem) { + nbls := &NextBlockLinkSystem{} seen := make(map[cid.Cid]struct{}) - return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { - cid := l.(cidlink.Link).Cid + storageReadOpener := lsys.StorageReadOpener + nextBlockReadOpener := func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + cid := l.(cidlink.Link).Cid var data []byte var err error if _, ok := seen[cid]; ok { @@ -165,7 +183,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w } } else { // duplicate block, rely on the supplied LinkSystem to have stored this - rdr, err := lsys.StorageReadOpener(lc, l) + rdr, err := storageReadOpener(lc, l) if !cfg.WriteDuplicatesOut { return rdr, err } @@ -198,6 +216,18 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w } return io.NopCloser(rdr), nil } + + // wrap nextBlockReadOpener in one that captures errors on `nbls` + lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + rdr, err := nextBlockReadOpener(lc, l) + if err != nil { + nbls.Error = err + return nil, err + } + return rdr, nil + } + + return nbls, lsys } type carReader struct { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 3dc5f06e..40698131 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -3,6 +3,7 @@ package verifiedcar_test import ( "bytes" "context" + "errors" "io" "math/rand" "os" @@ -74,7 +75,9 @@ func TestVerifiedCar(t *testing.T) { } } - unixfsShardedDir := testutil.GenerateNoDupes(func() unixfs.DirEntry { return unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, true) }) + unixfsShardedDir := testutil.GenerateNoDupes(func() unixfs.DirEntry { + return testutil.GenerateStrictlyNestedShardedDir(t, &lsys, rndReader, 8<<20) + }) unixfsShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, allSelector) unixfsPreloadSelector := unixfsnode.MatchUnixFSPreloadSelector.Node() @@ -115,7 +118,9 @@ func TestVerifiedCar(t *testing.T) { blocks []expectedBlock roots []cid.Cid carv2 bool - err string + expectErr string + streamErr error + blockWriteErr error cfg verifiedcar.Config incomingHasDups bool }{ @@ -129,11 +134,11 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv2 without AllowCARv2 errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{root1}, - carv2: true, - err: "bad CAR version", + name: "carv2 without AllowCARv2 errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + carv2: true, + expectErr: "bad CAR version", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -151,10 +156,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with multiple roots errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{root1, root1}, - err: "root CID mismatch", + name: "carv1 with multiple roots errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1, root1}, + expectErr: "root CID mismatch", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -171,10 +176,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with wrong root errors", - blocks: consumedBlocks(allBlocks), - roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, - err: "root CID mismatch", + name: "carv1 with wrong root errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, + expectErr: "root CID mismatch", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -182,40 +187,40 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "carv1 with extraneous trailing block errors", - blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), - roots: []cid.Cid{root1}, - err: "extraneous block in CAR", + name: "carv1 with extraneous trailing block errors", + blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), + roots: []cid.Cid{root1}, + expectErr: "extraneous block in CAR", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with extraneous leading block errors", - blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), - roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), + name: "carv1 with extraneous leading block errors", + blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), + roots: []cid.Cid{root1}, + expectErr: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with out-of-order blocks errors", - blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), - roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), + name: "carv1 with out-of-order blocks errors", + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), + roots: []cid.Cid{root1}, + expectErr: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, }, }, { - name: "carv1 with mismatching CID errors", - blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), - roots: []cid.Cid{root1}, - err: "mismatch in content integrity", + name: "carv1 with mismatching CID errors", + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), + roots: []cid.Cid{root1}, + expectErr: "mismatch in content integrity", cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -225,7 +230,7 @@ func TestVerifiedCar(t *testing.T) { name: "carv1 over budget errors", blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, - err: (&traversal.ErrBudgetExceeded{ + expectErr: (&traversal.ErrBudgetExceeded{ BudgetKind: "link", Path: datamodel.ParsePath("Parents/0/Parents/0/Parents/0"), Link: tbc1.LinkTipIndex(3), @@ -273,26 +278,20 @@ func TestVerifiedCar(t *testing.T) { }, }, { - // TODO: this is flaky, why? - // Error "extraneous block in CAR" does not contain "unexpected block in CAR" - // it's always a directory.UnixFSBasicDir, we use preload match `.` which should - // only want the first block. unixfsDirBlocks is created from an allSelector - // traversal, why is unixfs-preload making a difference for just matching a - // directory.UnixFSBasicDir. - name: "unixfs: all of large directory with file scope, errors", - blocks: consumedBlocks(unixfsDirBlocks), - roots: []cid.Cid{unixfsDir.Root}, - err: "extraneous block in CAR", + name: "unixfs: all of large directory with file scope, errors", + blocks: consumedBlocks(unixfsDirBlocks), + roots: []cid.Cid{unixfsDir.Root}, + expectErr: "extraneous block in CAR", cfg: verifiedcar.Config{ Root: unixfsDir.Root, Selector: unixfsPreloadSelector, }, }, { - name: "unixfs: all of large sharded directory with file scope, errors", - blocks: consumedBlocks(unixfsShardedDirBlocks), - roots: []cid.Cid{unixfsShardedDir.Root}, - err: "extraneous block in CAR", + name: "unixfs: all of large sharded directory with file scope, errors", + blocks: consumedBlocks(unixfsShardedDirBlocks), + roots: []cid.Cid{unixfsShardedDir.Root}, + expectErr: "unexpected block in CAR:", cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, Selector: unixfsPreloadSelector, @@ -317,10 +316,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: pathed subset inside large directory with file scope, errors", - blocks: consumedBlocks(unixfsDirBlocks), - roots: []cid.Cid{unixfsDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: pathed subset inside large directory with file scope, errors", + blocks: consumedBlocks(unixfsDirBlocks), + roots: []cid.Cid{unixfsDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsDir.Root, Selector: unixfsDirSubsetSelector, @@ -337,10 +336,10 @@ func TestVerifiedCar(t *testing.T) { }, { // our wrapped file has additional in the nested directories - name: "unixfs: large sharded file wrapped in directories, pathed, errors", - blocks: consumedBlocks(unixfsWrappedFileBlocks), - roots: []cid.Cid{unixfsWrappedFile.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded file wrapped in directories, pathed, errors", + blocks: consumedBlocks(unixfsWrappedFileBlocks), + roots: []cid.Cid{unixfsWrappedFile.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, Selector: unixfsWrappedPathSelector, @@ -356,10 +355,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", - blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), - roots: []cid.Cid{unixfsWrappedFile.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), + roots: []cid.Cid{unixfsWrappedFile.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, Selector: allSelector, @@ -385,10 +384,10 @@ func TestVerifiedCar(t *testing.T) { }, { // our wrapped dir has additional in the nested directories - name: "unixfs: large sharded dir wrapped in directories, pathed, errors", - blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), - roots: []cid.Cid{unixfsWrappedShardedDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded dir wrapped in directories, pathed, errors", + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), + roots: []cid.Cid{unixfsWrappedShardedDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, Selector: unixfsWrappedPathSelector, @@ -413,10 +412,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", - blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), - roots: []cid.Cid{unixfsWrappedShardedDir.Root}, - err: "unexpected block in CAR", + name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), + roots: []cid.Cid{unixfsWrappedShardedDir.Root}, + expectErr: "unexpected block in CAR", cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, Selector: allSelector, @@ -450,10 +449,10 @@ func TestVerifiedCar(t *testing.T) { }, }, { - name: "unixfs: file with dups, incoming has dups, not allowed", - blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), - err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), - roots: []cid.Cid{unixfsFileWithDups.Root}, + name: "unixfs: file with dups, incoming has dups, not allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + expectErr: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), + roots: []cid.Cid{unixfsFileWithDups.Root}, cfg: verifiedcar.Config{ Root: unixfsFileWithDups.Root, Selector: allSelector, @@ -493,6 +492,28 @@ func TestVerifiedCar(t *testing.T) { }, incomingHasDups: true, }, + { + name: "premature stream end errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + expectErr: "something wicked this way comes", + streamErr: errors.New("something wicked this way comes"), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + }, + }, + { + name: "block write error errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + expectErr: "something wicked this way comes", + blockWriteErr: errors.New("something wicked this way comes"), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + }, + }, } for _, testCase := range testCases { @@ -517,6 +538,9 @@ func TestVerifiedCar(t *testing.T) { lsys.StorageWriteOpener = func(lc linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { var buf bytes.Buffer return &buf, func(l datamodel.Link) error { + if testCase.blockWriteErr != nil && writeCounter+skipped == len(testCase.blocks)/2 { + return testCase.blockWriteErr + } for testCase.blocks[writeCounter+skipped].skipped { skipped++ } @@ -532,14 +556,14 @@ func TestVerifiedCar(t *testing.T) { }, nil } - carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "", testCase.incomingHasDups) + carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.expectErr != "", testCase.incomingHasDups, testCase.streamErr) blockCount, byteCount, err := testCase.cfg.VerifyCar(ctx, carStream, lsys) // read the rest of data io.ReadAll(carStream) - if testCase.err != "" { - req.ErrorContains(err, testCase.err) + if testCase.expectErr != "" { + req.ErrorContains(err, testCase.expectErr) req.Equal(uint64(0), blockCount) req.Equal(uint64(0), byteCount) } else { @@ -560,6 +584,7 @@ func makeCarStream( carv2 bool, expectErrors bool, allowDuplicatePuts bool, + streamError error, ) io.Reader { r, w := io.Pipe() @@ -588,7 +613,11 @@ func makeCarStream( if err != nil { return } - for _, block := range blocks { + for ii, block := range blocks { + if streamError != nil && ii == len(blocks)/2 { + w.CloseWithError(streamError) + return + } err := carWriter.Put(ctx, block.Cid().KeyString(), block.RawData()) if !expectErrors { req.NoError(err)