Skip to content

Commit

Permalink
fix: various traversal and verifier fixes
Browse files Browse the repository at this point in the history
Closes: #336

* don't treat context cancellation as an ErrExtraneousBlock in the CAR verifier
* capture and properly handle block load errors that are missed by the
  go-ipld-prime traverser, ref: ipld/go-ipld-prime#524
* fix flaky case(s) in verifiedcar test suite where multi-level sharded
  directory is assumed but only a single block dir is produced
  • Loading branch information
rvagg committed Jul 4, 2023
1 parent e558f16 commit 59c5880
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 90 deletions.
45 changes: 43 additions & 2 deletions pkg/internal/testutil/gen.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package testutil

import (
"fmt"
"io"
"math/rand"
"net"
"strconv"
Expand All @@ -10,7 +12,10 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/ipfs/go-unixfsnode/data"
unixfs "github.com/ipfs/go-unixfsnode/testutil"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipni/go-libipni/metadata"
crypto "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -150,9 +155,11 @@ func (ZeroReader) Read(b []byte) (n int, err error) {
return len(b), nil
}

// TODO: this should probably be an option in unixfsnode/testutil, for
// generators to strictly not return a DAG with duplicates
// TODO: these should probably be in unixfsnode/testutil, or as options to
// the respective functions there.

// GenerateNoDupes runs the unixfsnode/testutil generator function repeatedly
// until it produces a DAG with strictly no duplicate CIDs.
func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
var check func(unixfs.DirEntry) bool
var seen map[cid.Cid]struct{}
Expand All @@ -178,3 +185,37 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

// GenerateStrictlyNestedShardedDir is a wrapper around
// unixfsnode/testutil.GenerateDirectory that uses dark magic to repeatedly
// generate a sharded directory until it produces one that is strictly nested.
// That is, it produces a sharded directory structure with strictly at least one
// level of sharding with at least two child shards.
//
// Since it is possible to produce a sharded directory that is
// contained in a single block, this function provides a way to generate a
// sharded directory for cases where we need to test multi-level sharding.
func GenerateStrictlyNestedShardedDir(t *testing.T, linkSys *linking.LinkSystem, randReader io.Reader, targetSize int) unixfs.DirEntry {
for {
de := unixfs.GenerateDirectory(t, linkSys, randReader, targetSize, true)
nd, err := linkSys.Load(linking.LinkContext{}, cidlink.Link{Cid: de.Root}, dagpb.Type.PBNode)
require.NoError(t, err)
ufsd, err := data.DecodeUnixFSData(nd.(dagpb.PBNode).Data.Must().Bytes())
require.NoError(t, err)
pfxLen := len(fmt.Sprintf("%X", ufsd.FieldFanout().Must().Int()-1))
iter := nd.(dagpb.PBNode).Links.ListIterator()
childShards := 0
for !iter.Done() {
_, lnk, err := iter.Next()
require.NoError(t, err)
nameLen := len(lnk.(dagpb.PBLink).Name.Must().String())
if nameLen == pfxLen {
// name is just a shard prefix, so we have at least one level of nesting
childShards++
}
}
if childShards >= 2 {
return de
}
}
}
35 changes: 31 additions & 4 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
ctx,
cidlink.Link{Cid: br.request.Cid},
selector,
&traversalLinkSys,
traversalLinkSys,
preloader,
br.request.MaxBlocks,
)
Expand Down Expand Up @@ -295,15 +295,20 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi
}
}

func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error {
return nil
}

func easyTraverse(
ctx context.Context,
root datamodel.Link,
traverseSelector datamodel.Node,
lsys *linking.LinkSystem,
lsys linking.LinkSystem,
preloader preload.Loader,
maxBlocks uint64,
) error {

lsys, ecr := newErrorCapturingReader(lsys)
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)

// retrieve first node
Expand All @@ -319,7 +324,7 @@ func easyTraverse(
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: *lsys,
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: protoChooser,
Preloader: preloader,
},
Expand All @@ -335,5 +340,27 @@ func easyTraverse(
if err != nil {
return err
}
return progress.WalkAdv(node, compiledSelector, func(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { return nil })
if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil {
return err
}
return ecr.Error
}

type errorCapturingReader struct {
sro linking.BlockReadOpener
Error error
}

func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) {
ecr := &errorCapturingReader{sro: lsys.StorageReadOpener}
lsys.StorageReadOpener = ecr.StorageReadOpener
return lsys, ecr
}

func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
r, err := ecr.sro(lc, l)
if err != nil {
ecr.Error = err
}
return r, err
}
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader {
}
}

func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) {
func (t *timeToFirstByteReader) Read(p []byte) (int, error) {
if !t.first {
t.first = true
defer t.cb()
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/http/ipfs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpserver

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -188,7 +189,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req))

// force all blocks to flush
if cerr := carWriter.Close(); cerr != nil {
if cerr := carWriter.Close(); cerr != nil && !errors.Is(cerr, context.Canceled) {
logger.Infof("error closing car writer: %s", cerr)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/duplicateaddercar.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (bs *blockStream) Close() {
}

func (bs *blockStream) WriteBlock(blk blocks.Block) error {
if bs.ctx.Err() != nil {
return bs.ctx.Err()
}
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.done {
Expand Down
46 changes: 38 additions & 8 deletions pkg/verifiedcar/verifiedcar.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason)
//
// * https://specs.ipfs.tech/http-gateways/path-gateway/
func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) {

cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false))
if err != nil {
// TODO: post-1.19: fmt.Errorf("%w: %w", ErrMalformedCar, err)
Expand All @@ -93,7 +92,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin
}

func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) {

sel, err := selector.CompileSelector(cfg.Selector)
if err != nil {
return 0, 0, err
Expand All @@ -106,7 +104,7 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity
unixfsnode.AddUnixFSReificationToLinkSystem(&lsys)

lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys)
nbls, lsys := NewNextBlockLinkSystem(ctx, cfg, cr, bt, lsys)

// run traversal in this goroutine
progress := traversal.Progress{
Expand Down Expand Up @@ -136,21 +134,41 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l
return 0, 0, traversalError(err)
}

if nbls.Error != nil {
// capture any errors not bubbled up through the traversal, i.e. see
// https://github.com/ipld/go-ipld-prime/pull/524
return 0, 0, nbls.Error
}

// make sure we don't have any extraneous data beyond what the traversal needs
_, err = cbr.Next()
if !errors.Is(err, io.EOF) {
if err == nil {
return 0, 0, ErrExtraneousBlock
} else if !errors.Is(err, io.EOF) {
return 0, 0, err
}

// wait for parser to finish and provide errors or stats
return bt.blocks, bt.bytes, nil
}

func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener {
type NextBlockLinkSystem struct {
Error error
}

func NewNextBlockLinkSystem(
ctx context.Context,
cfg Config,
cr *carReader,
bt *writeTracker,
lsys linking.LinkSystem,
) (*NextBlockLinkSystem, linking.LinkSystem) {
nbls := &NextBlockLinkSystem{}
seen := make(map[cid.Cid]struct{})
return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid
storageReadOpener := lsys.StorageReadOpener

nextBlockReadOpener := func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
cid := l.(cidlink.Link).Cid
var data []byte
var err error
if _, ok := seen[cid]; ok {
Expand All @@ -165,7 +183,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w
}
} else {
// duplicate block, rely on the supplied LinkSystem to have stored this
rdr, err := lsys.StorageReadOpener(lc, l)
rdr, err := storageReadOpener(lc, l)
if !cfg.WriteDuplicatesOut {
return rdr, err
}
Expand Down Expand Up @@ -198,6 +216,18 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w
}
return io.NopCloser(rdr), nil
}

// wrap nextBlockReadOpener in one that captures errors on `nbls`
lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
rdr, err := nextBlockReadOpener(lc, l)
if err != nil {
nbls.Error = err
return nil, err
}
return rdr, nil
}

return nbls, lsys
}

type carReader struct {
Expand Down
Loading

0 comments on commit 59c5880

Please sign in to comment.