diff --git a/hamt/hamt.go b/hamt/hamt.go index 2f50a6f7e..7e770c9d6 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -25,11 +25,13 @@ import ( "fmt" "os" + format "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/internal" + bitfield "github.com/ipfs/go-bitfield" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" - format "github.com/ipfs/go-unixfs" ) const ( @@ -37,6 +39,10 @@ const ( HashMurmur3 uint64 = 0x22 ) +func init() { + internal.HAMTHashFunction = murmur3Hash +} + func (ds *Shard) isValueNode() bool { return ds.key != "" && ds.val != nil } @@ -45,17 +51,29 @@ func (ds *Shard) isValueNode() bool { type Shard struct { childer *childer - tableSize int + // Entries per node (number of possible childs indexed by the partial key). + tableSize int + // Bits needed to encode child indexes (log2 of number of entries). This is + // the number of bits taken from the hash key on each level of the tree. tableSizeLg2 int builder cid.Builder hashFunc uint64 + // String format with number of zeros that will be present in the hexadecimal + // encoding of the child index to always reach the fixed maxpadlen chars. + // Example: maxpadlen = 4 => prefixPadStr: "%04X" (print number in hexadecimal + // format padding with zeros to always reach 4 characters). prefixPadStr string - maxpadlen int + // Length in chars of string that encodes child indexes. We encode indexes + // as hexadecimal strings to this is log4 of number of entries. + maxpadlen int dserv ipld.DAGService + // FIXME: Remove. We don't actually store "value nodes". This confusing + // abstraction just removes the maxpadlen from the link names to extract + // the actual value link the trie is storing. // leaf node key string val *ipld.Link @@ -68,12 +86,13 @@ func NewShard(dserv ipld.DAGService, size int) (*Shard, error) { return nil, err } + // FIXME: Make this at least a static configuration for testing. ds.hashFunc = HashMurmur3 return ds, nil } func makeShard(ds ipld.DAGService, size int) (*Shard, error) { - lg2s, err := logtwo(size) + lg2s, err := Logtwo(size) if err != nil { return nil, err } @@ -211,7 +230,7 @@ func (ds *Shard) Set(ctx context.Context, name string, nd ipld.Node) error { // name key in this Shard or its children. It also returns the previous link // under that name key (if any). func (ds *Shard) SetAndPrevious(ctx context.Context, name string, node ipld.Node) (*ipld.Link, error) { - hv := &hashBits{b: hash([]byte(name))} + hv := newHashBits(name) err := ds.dserv.Add(ctx, node) if err != nil { return nil, err @@ -221,6 +240,9 @@ func (ds *Shard) SetAndPrevious(ctx context.Context, name string, node ipld.Node if err != nil { return nil, err } + + // FIXME: We don't need to set the name here, it will get overwritten. + // This is confusing, confirm and remove this line. lnk.Name = ds.linkNamePrefix(0) + name return ds.setValue(ctx, hv, name, lnk) @@ -236,13 +258,13 @@ func (ds *Shard) Remove(ctx context.Context, name string) error { // RemoveAndPrevious is similar to the public Remove but also returns the // old removed link (if it exists). func (ds *Shard) RemoveAndPrevious(ctx context.Context, name string) (*ipld.Link, error) { - hv := &hashBits{b: hash([]byte(name))} + hv := newHashBits(name) return ds.setValue(ctx, hv, name, nil) } // Find searches for a child node by 'name' within this hamt func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) { - hv := &hashBits{b: hash([]byte(name))} + hv := newHashBits(name) var out *ipld.Link err := ds.getValue(ctx, hv, name, func(sv *Shard) error { @@ -489,10 +511,7 @@ func (ds *Shard) setValue(ctx context.Context, hv *hashBits, key string, value * return nil, err } child.builder = ds.builder - chhv := &hashBits{ - b: hash([]byte(grandChild.key)), - consumed: hv.consumed, - } + chhv := newConsumedHashBits(grandChild.key, hv.consumed) // We explicitly ignore the oldValue returned by the next two insertions // (which will be nil) to highlight there is no overwrite here: they are diff --git a/hamt/util.go b/hamt/util.go index 7ae02dfb3..29f59435e 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -2,9 +2,11 @@ package hamt import ( "fmt" + "math/bits" + + "github.com/ipfs/go-unixfs/internal" "github.com/spaolacci/murmur3" - "math/bits" ) // hashBits is a helper that allows the reading of the 'next n bits' as an integer. @@ -13,6 +15,16 @@ type hashBits struct { consumed int } +func newHashBits(val string) *hashBits { + return &hashBits{b: internal.HAMTHashFunction([]byte(val))} +} + +func newConsumedHashBits(val string, consumed int) *hashBits { + hv := &hashBits{b: internal.HAMTHashFunction([]byte(val))} + hv.consumed = consumed + return hv +} + func mkmask(n int) byte { return (1 << uint(n)) - 1 } @@ -50,7 +62,7 @@ func (hb *hashBits) next(i int) int { } } -func logtwo(v int) (int, error) { +func Logtwo(v int) (int, error) { if v <= 0 { return 0, fmt.Errorf("hamt size should be a power of two") } @@ -61,7 +73,7 @@ func logtwo(v int) (int, error) { return lg2, nil } -func hash(val []byte) []byte { +func murmur3Hash(val []byte) []byte { h := murmur3.New64() h.Write(val) return h.Sum(nil) diff --git a/internal/config.go b/internal/config.go new file mode 100644 index 000000000..9250ae2ae --- /dev/null +++ b/internal/config.go @@ -0,0 +1,3 @@ +package internal + +var HAMTHashFunction func(val []byte) []byte diff --git a/io/completehamt_test.go b/io/completehamt_test.go new file mode 100644 index 000000000..1bb3d8720 --- /dev/null +++ b/io/completehamt_test.go @@ -0,0 +1,95 @@ +package io + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "testing" + + mdtest "github.com/ipfs/go-merkledag/test" + "github.com/stretchr/testify/assert" + + "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/hamt" + + ipld "github.com/ipfs/go-ipld-format" +) + +// CreateCompleteHAMT creates a HAMT the following properties: +// * its height (distance/edges from root to deepest node) is specified by treeHeight. +// * all leaf Shard nodes have the same depth (and have only 'value' links). +// * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). +// * the total number of 'value' links (directory entries) is: +// io.DefaultShardWidth ^ (treeHeight + 1). +// FIXME: HAMTHashFunction needs to be set to idHash by the caller. We depend on +// this simplification for the current logic to work. (HAMTHashFunction is a +// global setting of the package, it is hard-coded in the serialized Shard node +// and not allowed to be changed on a per HAMT/Shard basis.) +// (If we didn't rehash inside setValue then we could just generate +// the fake hash as in io.SetAndPrevious through `newHashBits()` and pass +// it as an argument making the hash independent of tree manipulation; that +// sounds as the correct way to go in general and we wouldn't need this.) +func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) { + if treeHeight < 1 { + panic("treeHeight < 1") + } + if treeHeight > 8 { + panic("treeHeight > 8: we don't allow a key larger than what can be encoded in a 64-bit word") + } + + rootShard, err := hamt.NewShard(ds, childsPerNode) + if err != nil { + return nil, err + } + + // Assuming we are using the ID hash function we can just insert all + // the combinations of a byte slice that will reach the desired height. + totalChildren := int(math.Pow(float64(childsPerNode), float64(treeHeight))) + log2ofChilds, err := hamt.Logtwo(childsPerNode) + if err != nil { + return nil, err + } + if log2ofChilds*treeHeight%8 != 0 { + return nil, fmt.Errorf("childsPerNode * treeHeight should be multiple of 8") + } + bytesInKey := log2ofChilds * treeHeight / 8 + for i := 0; i < totalChildren; i++ { + var hashbuf [8]byte + binary.LittleEndian.PutUint64(hashbuf[:], uint64(i)) + var oldLink *ipld.Link + oldLink, err = rootShard.SetAndPrevious(context.Background(), string(hashbuf[:bytesInKey]), unixfs.EmptyFileNode()) + if err != nil { + return nil, err + } + if oldLink != nil { + // We shouldn't be overwriting any value, otherwise the tree + // won't be complete. + return nil, fmt.Errorf("we have overwritten entry %s", + oldLink.Cid) + } + } + + return rootShard.Node() +} + +// Return the same value as the hash. +func idHash(val []byte) []byte { + return val +} + +func TestCreateCompleteShard(t *testing.T) { + ds := mdtest.Mock() + childsPerNode := 16 + treeHeight := 2 + node, err := CreateCompleteHAMT(ds, treeHeight, childsPerNode) + assert.NoError(t, err) + + shard, err := hamt.NewHamtFromDag(ds, node) + assert.NoError(t, err) + links, err := shard.EnumLinks(context.Background()) + assert.NoError(t, err) + + childNodes := int(math.Pow(float64(childsPerNode), float64(treeHeight))) + assert.Equal(t, childNodes, len(links)) +} diff --git a/io/directory.go b/io/directory.go index 5db1b3c61..b8915a3b0 100644 --- a/io/directory.go +++ b/io/directory.go @@ -3,14 +3,16 @@ package io import ( "context" "fmt" - mdag "github.com/ipfs/go-merkledag" - format "github.com/ipfs/go-unixfs" - "github.com/ipfs/go-unixfs/hamt" "os" + "github.com/ipfs/go-unixfs/hamt" + "github.com/ipfs/go-unixfs/private/linksize" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + mdag "github.com/ipfs/go-merkledag" + format "github.com/ipfs/go-unixfs" ) var log = logging.Logger("unixfs") @@ -24,6 +26,7 @@ var log = logging.Logger("unixfs") var HAMTShardingSize = 0 // DefaultShardWidth is the default value used for hamt sharding width. +// Needs to be a power of two (shard entry size) and multiple of 8 (bitfield size). var DefaultShardWidth = 256 // Directory defines a UnixFS directory. It is used for creating, reading and @@ -78,7 +81,9 @@ func productionLinkSize(linkName string, linkCid cid.Cid) int { return len(linkName) + linkCid.ByteLen() } -var estimatedLinkSize = productionLinkSize +func init() { + linksize.LinkSizeFunction = productionLinkSize +} // BasicDirectory is the basic implementation of `Directory`. All the entries // are stored in a single node. @@ -167,11 +172,11 @@ func (d *BasicDirectory) computeEstimatedSize() { } func (d *BasicDirectory) addToEstimatedSize(name string, linkCid cid.Cid) { - d.estimatedSize += estimatedLinkSize(name, linkCid) + d.estimatedSize += linksize.LinkSizeFunction(name, linkCid) } func (d *BasicDirectory) removeFromEstimatedSize(name string, linkCid cid.Cid) { - d.estimatedSize -= estimatedLinkSize(name, linkCid) + d.estimatedSize -= linksize.LinkSizeFunction(name, linkCid) if d.estimatedSize < 0 { // Something has gone very wrong. Log an error and recompute the // size from scratch. @@ -208,10 +213,10 @@ func (d *BasicDirectory) needsToSwitchToHAMTDir(name string, nodeToAdd ipld.Node if err != nil { return false, err } - operationSizeChange -= estimatedLinkSize(name, entryToRemove.Cid) + operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid) } if nodeToAdd != nil { - operationSizeChange += estimatedLinkSize(name, nodeToAdd.Cid()) + operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid()) } return d.estimatedSize+operationSizeChange >= HAMTShardingSize, nil @@ -437,11 +442,11 @@ func (d *HAMTDirectory) switchToBasic(ctx context.Context) (*BasicDirectory, err } func (d *HAMTDirectory) addToSizeChange(name string, linkCid cid.Cid) { - d.sizeChange += estimatedLinkSize(name, linkCid) + d.sizeChange += linksize.LinkSizeFunction(name, linkCid) } func (d *HAMTDirectory) removeFromSizeChange(name string, linkCid cid.Cid) { - d.sizeChange -= estimatedLinkSize(name, linkCid) + d.sizeChange -= linksize.LinkSizeFunction(name, linkCid) } // Evaluate a switch from HAMTDirectory to BasicDirectory in case the size will @@ -464,12 +469,12 @@ func (d *HAMTDirectory) needsToSwitchToBasicDir(ctx context.Context, name string if err != nil { return false, err } - operationSizeChange -= estimatedLinkSize(name, entryToRemove.Cid) + operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid) } // For the AddEntry case compute the size addition of the new entry. if nodeToAdd != nil { - operationSizeChange += estimatedLinkSize(name, nodeToAdd.Cid()) + operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid()) } if d.sizeChange+operationSizeChange >= 0 { @@ -506,7 +511,7 @@ func (d *HAMTDirectory) sizeBelowThreshold(ctx context.Context, sizeChange int) return false, linkResult.Err } - partialSize += estimatedLinkSize(linkResult.Link.Name, linkResult.Link.Cid) + partialSize += linksize.LinkSizeFunction(linkResult.Link.Name, linkResult.Link.Cid) if partialSize+sizeChange >= HAMTShardingSize { // We have already fetched enough shards to assert we are // above the threshold, so no need to keep fetching. @@ -581,17 +586,6 @@ func (d *UpgradeableDirectory) AddChild(ctx context.Context, name string, nd ipl return nil } -func (d *UpgradeableDirectory) getDagService() ipld.DAGService { - switch v := d.Directory.(type) { - case *BasicDirectory: - return v.dserv - case *HAMTDirectory: - return v.dserv - default: - panic("unknown directory type") - } -} - // RemoveChild implements the `Directory` interface. Used in the case where we wrap // a HAMTDirectory that might need to be downgraded to a BasicDirectory. The // upgrade path is in AddChild. diff --git a/io/directory_test.go b/io/directory_test.go index 218f25822..909f9b4fd 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -5,8 +5,11 @@ import ( "fmt" "math" "sort" + "strconv" "strings" + "sync" "testing" + "time" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -14,6 +17,9 @@ import ( mdtest "github.com/ipfs/go-merkledag/test" ft "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/hamt" + "github.com/ipfs/go-unixfs/internal" + "github.com/ipfs/go-unixfs/private/linksize" "github.com/stretchr/testify/assert" ) @@ -105,161 +111,298 @@ func TestDuplicateAddDir(t *testing.T) { } } -// FIXME: Nothing blocking but nice to have: -// * Check estimated size against link enumeration (indirectly done in the -// restored node check from NewDirectoryFromNode). -// * Check estimated size against encoded node (the difference should only be -// a small percentage for a directory with 10s of entries). -// FIXME: Add a test for the HAMT sizeChange abstracting some of the code from -// this one. func TestBasicDirectory_estimatedSize(t *testing.T) { ds := mdtest.Mock() + basicDir := newEmptyBasicDirectory(ds) + + testDirectorySizeEstimation(t, basicDir, ds, func(dir Directory) int { + return dir.(*BasicDirectory).estimatedSize + }) +} + +func TestHAMTDirectory_sizeChange(t *testing.T) { + ds := mdtest.Mock() + hamtDir, err := newEmptyHAMTDirectory(ds, DefaultShardWidth) + assert.NoError(t, err) + + testDirectorySizeEstimation(t, hamtDir, ds, func(dir Directory) int { + // Since we created a HAMTDirectory from scratch with size 0 its + // internal sizeChange delta will in fact track the directory size + // throughout this run. + return dir.(*HAMTDirectory).sizeChange + }) +} + +func fullSizeEnumeration(dir Directory) int { + size := 0 + dir.ForEachLink(context.Background(), func(l *ipld.Link) error { + size += linksize.LinkSizeFunction(l.Name, l.Cid) + return nil + }) + return size +} + +func testDirectorySizeEstimation(t *testing.T, dir Directory, ds ipld.DAGService, size func(Directory) int) { + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() + ctx := context.Background() child := ft.EmptyFileNode() - err := ds.Add(ctx, child) - if err != nil { - t.Fatal(err) - } - - basicDir := newEmptyBasicDirectory(ds) + assert.NoError(t, ds.Add(ctx, child)) // Several overwrites should not corrupt the size estimation. - basicDir.AddChild(ctx, "child", child) - basicDir.AddChild(ctx, "child", child) - basicDir.AddChild(ctx, "child", child) - basicDir.RemoveChild(ctx, "child") - basicDir.AddChild(ctx, "child", child) - basicDir.RemoveChild(ctx, "child") - // FIXME: Check errors above (abstract adds/removals in iteration). - if basicDir.estimatedSize != 0 { - t.Fatal("estimated size is not zero after removing all entries") - } - - for i := 0; i < 100; i++ { - basicDir.AddChild(ctx, fmt.Sprintf("child-%03d", i), child) // e.g., "child-045" - } - // Estimated entry size: name (9) + CID (32 from hash and 2 extra for header) - entrySize := 9 + 32 + 2 - expectedSize := 100 * entrySize - if basicDir.estimatedSize != expectedSize { - t.Fatalf("estimated size (%d) inaccurate after adding many entries (expected %d)", - basicDir.estimatedSize, expectedSize) - } - - basicDir.RemoveChild(ctx, "child-045") // just random values - basicDir.RemoveChild(ctx, "child-063") - basicDir.RemoveChild(ctx, "child-011") - basicDir.RemoveChild(ctx, "child-000") - basicDir.RemoveChild(ctx, "child-099") - - basicDir.RemoveChild(ctx, "child-045") // already removed, won't impact size - basicDir.RemoveChild(ctx, "nonexistent-name") // also doesn't count - basicDir.RemoveChild(ctx, "child-100") // same - expectedSize -= 5 * entrySize - if basicDir.estimatedSize != expectedSize { - t.Fatalf("estimated size (%d) inaccurate after removing some entries (expected %d)", - basicDir.estimatedSize, expectedSize) - } + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.RemoveChild(ctx, "child")) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.RemoveChild(ctx, "child")) + assert.Equal(t, 0, size(dir), "estimated size is not zero after removing all entries") + + dirEntries := 100 + for i := 0; i < dirEntries; i++ { + assert.NoError(t, dir.AddChild(ctx, fmt.Sprintf("child-%03d", i), child)) + } + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after adding many entries") + + assert.NoError(t, dir.RemoveChild(ctx, "child-045")) // just random values + assert.NoError(t, dir.RemoveChild(ctx, "child-063")) + assert.NoError(t, dir.RemoveChild(ctx, "child-011")) + assert.NoError(t, dir.RemoveChild(ctx, "child-000")) + assert.NoError(t, dir.RemoveChild(ctx, "child-099")) + dirEntries -= 5 + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after removing some entries") + + // All of the following remove operations will fail (won't impact dirEntries): + assert.Error(t, dir.RemoveChild(ctx, "nonexistent-name")) + assert.Error(t, dir.RemoveChild(ctx, "child-045")) // already removed + assert.Error(t, dir.RemoveChild(ctx, "child-100")) + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after failed remove attempts") // Restore a directory from original's node and check estimated size consistency. - basicDirSingleNode, _ := basicDir.GetNode() // no possible error - restoredBasicDir := newBasicDirectoryFromNode(ds, basicDirSingleNode.(*mdag.ProtoNode)) - if basicDir.estimatedSize != restoredBasicDir.estimatedSize { - t.Fatalf("restored basic directory size (%d) doesn't match original estimate (%d)", - basicDir.estimatedSize, restoredBasicDir.estimatedSize) - } + dirNode, err := dir.GetNode() + assert.NoError(t, err) + restoredDir, err := NewDirectoryFromNode(ds, dirNode.(*mdag.ProtoNode)) + assert.NoError(t, err) + assert.Equal(t, size(dir), fullSizeEnumeration(restoredDir), "restored directory's size doesn't match original's") + // We don't use the estimation size function for the restored directory + // because in the HAMT case this function depends on the sizeChange variable + // that will be cleared when loading the directory from the node. + // This also covers the case of comparing the size estimation `size()` with + // the full enumeration function `fullSizeEnumeration()` to make sure it's + // correct. } -// FIXME: Add a similar one for HAMT directory, stressing particularly the -// deleted/overwritten entries and their computation in the size variation. - +// Any entry link size will have the fixedSize passed. func mockLinkSizeFunc(fixedSize int) func(linkName string, linkCid cid.Cid) int { return func(_ string, _ cid.Cid) int { return fixedSize } } -// Basic test on extreme threshold to trigger switch. More fine-grained sizes -// are checked in TestBasicDirectory_estimatedSize (without the swtich itself -// but focusing on the size computation). -// FIXME: Ideally, instead of checking size computation on one test and directory -// upgrade on another a better structured test should test both dimensions -// simultaneously. -func TestUpgradeableDirectory(t *testing.T) { - // FIXME: Modifying these static configuraitons is probably not - // concurrent-friendly. +func checkBasicDirectory(t *testing.T, dir Directory, errorMessage string) { + if _, ok := dir.(*UpgradeableDirectory).Directory.(*BasicDirectory); !ok { + t.Fatal(errorMessage) + } +} + +func checkHAMTDirectory(t *testing.T, dir Directory, errorMessage string) { + if _, ok := dir.(*UpgradeableDirectory).Directory.(*HAMTDirectory); !ok { + t.Fatal(errorMessage) + } +} + +func TestProductionLinkSize(t *testing.T) { + link, err := ipld.MakeLink(ft.EmptyDirNode()) + assert.NoError(t, err) + link.Name = "directory_link_name" + assert.Equal(t, 53, productionLinkSize(link.Name, link.Cid)) + + link, err = ipld.MakeLink(ft.EmptyFileNode()) + assert.NoError(t, err) + link.Name = "file_link_name" + assert.Equal(t, 48, productionLinkSize(link.Name, link.Cid)) + + ds := mdtest.Mock() + basicDir := newEmptyBasicDirectory(ds) + assert.NoError(t, err) + for i := 0; i < 10; i++ { + basicDir.AddChild(context.Background(), strconv.FormatUint(uint64(i), 10), ft.EmptyFileNode()) + } + basicDirNode, err := basicDir.GetNode() + assert.NoError(t, err) + link, err = ipld.MakeLink(basicDirNode) + assert.NoError(t, err) + link.Name = "basic_dir" + assert.Equal(t, 43, productionLinkSize(link.Name, link.Cid)) +} + +// Test HAMTDirectory <-> BasicDirectory switch based on directory size. The +// switch is managed by the UpgradeableDirectory abstraction. +func TestUpgradeableDirectorySwitch(t *testing.T) { oldHamtOption := HAMTShardingSize defer func() { HAMTShardingSize = oldHamtOption }() - estimatedLinkSize = mockLinkSizeFunc(1) - defer func() { estimatedLinkSize = productionLinkSize }() + HAMTShardingSize = 0 // Disable automatic switch at the start. + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() ds := mdtest.Mock() dir := NewDirectory(ds) + checkBasicDirectory(t, dir, "new dir is not BasicDirectory") + ctx := context.Background() child := ft.EmptyDirNode() err := ds.Add(ctx, child) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) - HAMTShardingSize = 0 // Create a BasicDirectory. - if _, ok := dir.(*UpgradeableDirectory).Directory.(*BasicDirectory); !ok { - t.Fatal("UpgradeableDirectory doesn't contain BasicDirectory") - } + err = dir.AddChild(ctx, "1", child) + assert.NoError(t, err) + checkBasicDirectory(t, dir, "added child, option still disabled") // Set a threshold so big a new entry won't trigger the change. HAMTShardingSize = math.MaxInt32 - err = dir.AddChild(ctx, "test", child) - if err != nil { - t.Fatal(err) - } - - if _, ok := dir.(*UpgradeableDirectory).Directory.(*HAMTDirectory); ok { - t.Fatal("UpgradeableDirectory was upgraded to HAMTDirectory for a large threshold") - } + err = dir.AddChild(ctx, "2", child) + assert.NoError(t, err) + checkBasicDirectory(t, dir, "added child, option now enabled but at max") // Now set it so low to make sure any new entry will trigger the upgrade. HAMTShardingSize = 1 - err = dir.AddChild(ctx, "test", child) // overwriting an entry should also trigger the switch - if err != nil { - t.Fatal(err) - } + // We are already above the threshold, we trigger the switch with an overwrite + // (any AddChild() should reevaluate the size). + err = dir.AddChild(ctx, "2", child) + assert.NoError(t, err) + checkHAMTDirectory(t, dir, "added child, option at min, should switch up") - if _, ok := dir.(*UpgradeableDirectory).Directory.(*HAMTDirectory); !ok { - t.Fatal("UpgradeableDirectory wasn't upgraded to HAMTDirectory for a low threshold") - } - upgradedDir := copyDir(t, dir) + // Set threshold at the number of current entries and delete the last one + // to trigger a switch and evaluate if the rest of the entries are conserved. + HAMTShardingSize = 2 + err = dir.RemoveChild(ctx, "2") + assert.NoError(t, err) + checkBasicDirectory(t, dir, "removed threshold entry, option at min, should switch down") +} - // Remove the single entry triggering the switch back to BasicDirectory - err = dir.RemoveChild(ctx, "test") - if err != nil { - t.Fatal(err) +func TestIntegrityOfDirectorySwitch(t *testing.T) { + ds := mdtest.Mock() + dir := NewDirectory(ds) + checkBasicDirectory(t, dir, "new dir is not BasicDirectory") + + ctx := context.Background() + child := ft.EmptyDirNode() + err := ds.Add(ctx, child) + assert.NoError(t, err) + + basicDir := newEmptyBasicDirectory(ds) + hamtDir, err := newEmptyHAMTDirectory(ds, DefaultShardWidth) + assert.NoError(t, err) + for i := 0; i < 1000; i++ { + basicDir.AddChild(ctx, strconv.FormatUint(uint64(i), 10), child) + hamtDir.AddChild(ctx, strconv.FormatUint(uint64(i), 10), child) } - if _, ok := dir.(*UpgradeableDirectory).Directory.(*BasicDirectory); !ok { - t.Fatal("UpgradeableDirectory wasn't downgraded to BasicDirectory after removal of the single entry") + compareDirectoryEntries(t, basicDir, hamtDir) + + hamtDirFromSwitch, err := basicDir.SwitchToSharding(ctx) + assert.NoError(t, err) + basicDirFromSwitch, err := hamtDir.switchToBasic(ctx) + assert.NoError(t, err) + compareDirectoryEntries(t, basicDir, basicDirFromSwitch) + compareDirectoryEntries(t, hamtDir, hamtDirFromSwitch) +} + +// This is the value of concurrent fetches during dag.Walk. Used in +// test to better predict how many nodes will be fetched. +var defaultConcurrentFetch = 32 + +// FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go. +// (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()` +// and take ownership of this configuration, but departing from the more +// standard and reliable one in `go-merkledag`. + +// Test that we fetch as little nodes as needed to reach the HAMTShardingSize +// during the sizeBelowThreshold computation. +func TestHAMTEnumerationWhenComputingSize(t *testing.T) { + // Adjust HAMT global/static options for the test to simplify its logic. + // FIXME: These variables weren't designed to be modified and we should + // review in depth side effects. + + // Set all link sizes to a uniform 1 so the estimated directory size + // is just the count of its entry links (in HAMT/Shard terminology these + // are the "value" links pointing to anything that is *not* another Shard). + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() + + // Use an identity hash function to ease the construction of "complete" HAMTs + // (see CreateCompleteHAMT below for more details). (Ideally this should be + // a parameter we pass and not a global option we modify in the caller.) + oldHashFunc := internal.HAMTHashFunction + defer func() { internal.HAMTHashFunction = oldHashFunc }() + internal.HAMTHashFunction = idHash + + oldHamtOption := HAMTShardingSize + defer func() { HAMTShardingSize = oldHamtOption }() + + // --- End of test static configuration adjustments. --- + + // Some arbitrary values below that make this test not that expensive. + treeHeight := 4 + // How many leaf shards nodes (with value links, + // i.e., directory entries) do we need to reach the threshold. + thresholdToWidthRatio := 4 + // Departing from DefaultShardWidth of 256 to reduce HAMT size in + // CreateCompleteHAMT. + shardWidth := 16 + HAMTShardingSize = shardWidth * thresholdToWidthRatio + + // We create a "complete" HAMT (see CreateCompleteHAMT for more details) + // with a regular structure to be able to predict how many Shard nodes we + // will need to fetch in order to reach the HAMTShardingSize threshold in + // sizeBelowThreshold (assuming a sequential DAG walk function). + ds := mdtest.Mock() + completeHAMTRoot, err := CreateCompleteHAMT(ds, treeHeight, shardWidth) + assert.NoError(t, err) + + // With this structure and a BFS traversal (from `parallelWalkDepth`) then + // we would roughly fetch the following nodes: + nodesToFetch := 0 + // * all layers up to (but not including) the last one with leaf nodes + // (because it's a BFS) + for i := 0; i < treeHeight-1; i++ { + nodesToFetch += int(math.Pow(float64(shardWidth), float64(i))) } + // * `thresholdToWidthRatio` leaf Shards with enough value links to reach + // the HAMTShardingSize threshold. + nodesToFetch += thresholdToWidthRatio - // Check integrity between switches. - // We need to account for the removed entry that triggered the switch - // back. - // FIXME: Abstract this for arbitrary entries. - missingLink, err := ipld.MakeLink(child) + countGetsDS := newCountGetsDS(ds) + hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot) assert.NoError(t, err) - missingLink.Name = "test" - compareDirectoryEntries(t, upgradedDir, dir, []*ipld.Link{missingLink}) + + countGetsDS.resetCounter() + countGetsDS.setRequestDelay(10 * time.Millisecond) + // (Without the `setRequestDelay` above the number of nodes fetched + // drops dramatically and unpredictably as the BFS starts to behave + // more like a DFS because some search paths are fetched faster than + // others.) + below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0) + assert.NoError(t, err) + assert.False(t, below) + t.Logf("fetched %d nodes (predicted range: %d-%d)", + countGetsDS.uniqueCidsFetched(), nodesToFetch, nodesToFetch+defaultConcurrentFetch) + // Check that the actual number of nodes fetched is within the margin of the + // estimated `nodesToFetch` plus an extra of `defaultConcurrentFetch` since + // we are fetching in parallel. + assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch+defaultConcurrentFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch) } // Compare entries in the leftDir against the rightDir and possibly // missingEntries in the second. -func compareDirectoryEntries(t *testing.T, leftDir Directory, rightDir Directory, missingEntries []*ipld.Link) { +func compareDirectoryEntries(t *testing.T, leftDir Directory, rightDir Directory) { leftLinks, err := getAllLinksSortedByName(leftDir) assert.NoError(t, err) rightLinks, err := getAllLinksSortedByName(rightDir) assert.NoError(t, err) - rightLinks = append(rightLinks, missingEntries...) - sortLinksByName(rightLinks) assert.Equal(t, len(leftLinks), len(rightLinks)) @@ -283,28 +426,6 @@ func sortLinksByName(l []*ipld.Link) { }) } -func copyDir(t *testing.T, d Directory) Directory { - dirNode, err := d.GetNode() - assert.NoError(t, err) - // Extract the DAG service from the directory (i.e., its link entries saved - // in it). This is not exposed in the interface and we won't change that now. - // FIXME: Still, this isn't nice. - var ds ipld.DAGService - switch v := d.(type) { - case *BasicDirectory: - ds = v.dserv - case *HAMTDirectory: - ds = v.dserv - case *UpgradeableDirectory: - ds = v.getDagService() - default: - panic("unknown directory type") - } - copiedDir, err := NewDirectoryFromNode(ds, dirNode) - assert.NoError(t, err) - return copiedDir -} - func TestDirBuilder(t *testing.T) { ds := mdtest.Mock() dir := NewDirectory(ds) @@ -389,3 +510,89 @@ func TestDirBuilder(t *testing.T) { t.Fatal("wrong number of links", len(asyncLinks), count) } } + +func newHAMTDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (*HAMTDirectory, error) { + shard, err := hamt.NewHamtFromDag(dserv, node) + if err != nil { + return nil, err + } + return &HAMTDirectory{ + dserv: dserv, + shard: shard, + }, nil +} + +func newEmptyHAMTDirectory(dserv ipld.DAGService, shardWidth int) (*HAMTDirectory, error) { + shard, err := hamt.NewShard(dserv, shardWidth) + if err != nil { + return nil, err + } + + return &HAMTDirectory{ + dserv: dserv, + shard: shard, + }, nil +} + +// countGetsDS is a DAG service that keeps track of the number of +// unique CIDs fetched. +type countGetsDS struct { + ipld.DAGService + + cidsFetched map[cid.Cid]struct{} + mapLock sync.Mutex + + getRequestDelay time.Duration +} + +var _ ipld.DAGService = (*countGetsDS)(nil) + +func newCountGetsDS(ds ipld.DAGService) *countGetsDS { + return &countGetsDS{ + ds, + make(map[cid.Cid]struct{}), + sync.Mutex{}, + 0, + } +} + +func (d *countGetsDS) resetCounter() { + d.mapLock.Lock() + defer d.mapLock.Unlock() + d.cidsFetched = make(map[cid.Cid]struct{}) +} + +func (d *countGetsDS) uniqueCidsFetched() int { + d.mapLock.Lock() + defer d.mapLock.Unlock() + return len(d.cidsFetched) +} + +func (d *countGetsDS) setRequestDelay(timeout time.Duration) { + d.getRequestDelay = timeout +} + +func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { + node, err := d.DAGService.Get(ctx, c) + if err != nil { + return nil, err + } + + d.mapLock.Lock() + _, cidRequestedBefore := d.cidsFetched[c] + d.cidsFetched[c] = struct{}{} + d.mapLock.Unlock() + + if d.getRequestDelay != 0 && !cidRequestedBefore { + // First request gets a timeout to simulate a network fetch. + // Subsequent requests get no timeout simulating an in-disk cache. + time.Sleep(d.getRequestDelay) + } + + return node, nil +} + +// Process sequentially (blocking) calling Get which tracks requests. +func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { + panic("GetMany not supported") +} diff --git a/private/linksize/linksize.go b/private/linksize/linksize.go new file mode 100644 index 000000000..e7ae098b6 --- /dev/null +++ b/private/linksize/linksize.go @@ -0,0 +1,5 @@ +package linksize + +import "github.com/ipfs/go-cid" + +var LinkSizeFunction func(linkName string, linkCid cid.Cid) int