-
Notifications
You must be signed in to change notification settings - Fork 52
Tests for unsharding PR #99
Changes from 15 commits
d60be37
c5e0a00
2713c85
8a80e74
94a6082
2f9afe9
cdcb270
2c49dc3
8176abf
9795f4e
805db3b
32ccacd
87d3898
dc7e405
ba5e036
ed5213a
860cb96
0e831ed
ba2d000
f7a8e99
20d45d6
64502b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,37 +25,55 @@ import ( | |
"fmt" | ||
"os" | ||
|
||
format "github.com/ipfs/go-unixfs" | ||
"github.com/ipfs/go-unixfs/internal" | ||
|
||
bitfield "github.com/ipfs/go-bitfield" | ||
cid "github.com/ipfs/go-cid" | ||
ipld "github.com/ipfs/go-ipld-format" | ||
dag "github.com/ipfs/go-merkledag" | ||
format "github.com/ipfs/go-unixfs" | ||
) | ||
|
||
const ( | ||
// HashMurmur3 is the multiformats identifier for Murmur3 | ||
HashMurmur3 uint64 = 0x22 | ||
) | ||
|
||
func (ds *Shard) isValueNode() bool { | ||
func init() { | ||
internal.HAMTHashFunction = murmur3Hash | ||
} | ||
|
||
func (ds *Shard) IsValueNode() bool { | ||
return ds.key != "" && ds.val != nil | ||
} | ||
|
||
// A Shard represents the HAMT. It should be initialized with NewShard(). | ||
type Shard struct { | ||
childer *childer | ||
|
||
tableSize int | ||
// Entries per node (number of possible childs indexed by the partial key). | ||
schomatis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tableSize int | ||
// Bits needed to encode child indexes (log2 of number of entries). This is | ||
// the number of bits taken from the hash key on each level of the tree. | ||
tableSizeLg2 int | ||
|
||
builder cid.Builder | ||
hashFunc uint64 | ||
|
||
// String format with number of zeros that will be present in the hexadecimal | ||
// encoding of the child index to always reach the fixed maxpadlen chars. | ||
// Example: maxpadlen = 4 => prefixPadStr: "%04X" (print number in hexadecimal | ||
// format padding with zeros to always reach 4 characters). | ||
prefixPadStr string | ||
maxpadlen int | ||
// Length in chars of string that encodes child indexes. We encode indexes | ||
// as hexadecimal strings to this is log4 of number of entries. | ||
maxpadlen int | ||
|
||
dserv ipld.DAGService | ||
|
||
// FIXME: Remove. We don't actually store "value nodes". This confusing | ||
// abstraction just removes the maxpadlen from the link names to extract | ||
// the actual value link the trie is storing. | ||
Comment on lines
+74
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding of your comment here is that Also, can you clarify if this a FIXME intended for this set of PRs? |
||
// leaf node | ||
key string | ||
val *ipld.Link | ||
|
@@ -68,12 +86,13 @@ func NewShard(dserv ipld.DAGService, size int) (*Shard, error) { | |
return nil, err | ||
} | ||
|
||
// FIXME: Make this at least a static configuration for testing. | ||
ds.hashFunc = HashMurmur3 | ||
return ds, nil | ||
} | ||
|
||
func makeShard(ds ipld.DAGService, size int) (*Shard, error) { | ||
lg2s, err := logtwo(size) | ||
lg2s, err := Logtwo(size) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -211,7 +230,7 @@ func (ds *Shard) Set(ctx context.Context, name string, nd ipld.Node) error { | |
// name key in this Shard or its children. It also returns the previous link | ||
// under that name key (if any). | ||
func (ds *Shard) SetAndPrevious(ctx context.Context, name string, node ipld.Node) (*ipld.Link, error) { | ||
hv := &hashBits{b: hash([]byte(name))} | ||
hv := newHashBits(name) | ||
err := ds.dserv.Add(ctx, node) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -221,6 +240,9 @@ func (ds *Shard) SetAndPrevious(ctx context.Context, name string, node ipld.Node | |
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// FIXME: We don't need to set the name here, it will get overwritten. | ||
// This is confusing, confirm and remove this line. | ||
lnk.Name = ds.linkNamePrefix(0) + name | ||
|
||
return ds.setValue(ctx, hv, name, lnk) | ||
|
@@ -236,13 +258,13 @@ func (ds *Shard) Remove(ctx context.Context, name string) error { | |
// RemoveAndPrevious is similar to the public Remove but also returns the | ||
// old removed link (if it exists). | ||
func (ds *Shard) RemoveAndPrevious(ctx context.Context, name string) (*ipld.Link, error) { | ||
hv := &hashBits{b: hash([]byte(name))} | ||
hv := newHashBits(name) | ||
return ds.setValue(ctx, hv, name, nil) | ||
} | ||
|
||
// Find searches for a child node by 'name' within this hamt | ||
func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) { | ||
hv := &hashBits{b: hash([]byte(name))} | ||
hv := newHashBits(name) | ||
|
||
var out *ipld.Link | ||
err := ds.getValue(ctx, hv, name, func(sv *Shard) error { | ||
|
@@ -276,7 +298,7 @@ func (ds *Shard) childLinkType(lnk *ipld.Link) (linkType, error) { | |
|
||
// Link returns a merklelink to this shard node | ||
func (ds *Shard) Link() (*ipld.Link, error) { | ||
if ds.isValueNode() { | ||
if ds.IsValueNode() { | ||
return ds.val, nil | ||
} | ||
|
||
|
@@ -305,7 +327,7 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func | |
return err | ||
} | ||
|
||
if child.isValueNode() { | ||
if child.IsValueNode() { | ||
if child.key == key { | ||
return cb(child) | ||
} | ||
|
@@ -332,6 +354,21 @@ func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) { | |
return links, nil | ||
} | ||
|
||
// FIXME: Check which functions do we need to actually expose. | ||
func (ds *Shard) EnumAll(ctx context.Context) ([]*ipld.Link, error) { | ||
var links []*ipld.Link | ||
|
||
linkResults := ds.EnumAllAsync(ctx) | ||
|
||
for linkResult := range linkResults { | ||
if linkResult.Err != nil { | ||
return links, linkResult.Err | ||
} | ||
links = append(links, linkResult.Link) | ||
} | ||
return links, nil | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this function is a duplicate of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @schomatis I pushed a commit removing this |
||
// ForEachLink walks the Shard and calls the given function. | ||
func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error { | ||
return ds.walkTrie(ctx, func(sv *Shard) error { | ||
|
@@ -345,6 +382,31 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro | |
// EnumLinksAsync returns a channel which will receive Links in the directory | ||
// as they are enumerated, where order is not guaranteed | ||
func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { | ||
linkResults := make(chan format.LinkResult) | ||
ctx, cancel := context.WithCancel(ctx) | ||
go func() { | ||
defer close(linkResults) | ||
defer cancel() | ||
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) | ||
cset := cid.NewSet() | ||
rootNode, err := ds.Node() | ||
if err != nil { | ||
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) | ||
return | ||
} | ||
// FIXME: Make concurrency an option for testing. | ||
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) | ||
//err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit) | ||
if err != nil { | ||
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) | ||
} | ||
}() | ||
return linkResults | ||
} | ||
|
||
// EnumLinksAsync returns a channel which will receive Links in the directory | ||
// as they are enumerated, where order is not guaranteed | ||
func (ds *Shard) EnumAllAsync(ctx context.Context) <-chan format.LinkResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't needed anymore given There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems not, and also seems to be unused. Removing |
||
linkResults := make(chan format.LinkResult) | ||
ctx, cancel := context.WithCancel(ctx) | ||
go func() { | ||
|
@@ -423,7 +485,7 @@ func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r for | |
|
||
func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error { | ||
return ds.childer.each(ctx, func(s *Shard) error { | ||
if s.isValueNode() { | ||
if s.IsValueNode() { | ||
if err := cb(s); err != nil { | ||
return err | ||
} | ||
|
@@ -455,7 +517,7 @@ func (ds *Shard) setValue(ctx context.Context, hv *hashBits, key string, value * | |
return | ||
} | ||
|
||
if child.isValueNode() { | ||
if child.IsValueNode() { | ||
// Leaf node. This is the base case of this recursive function. | ||
if child.key == key { | ||
// We are in the correct shard (tree level) so we modify this child | ||
|
@@ -489,10 +551,7 @@ func (ds *Shard) setValue(ctx context.Context, hv *hashBits, key string, value * | |
return nil, err | ||
} | ||
child.builder = ds.builder | ||
chhv := &hashBits{ | ||
b: hash([]byte(grandChild.key)), | ||
consumed: hv.consumed, | ||
} | ||
chhv := newConsumedHashBits(grandChild.key, hv.consumed) | ||
|
||
// We explicitly ignore the oldValue returned by the next two insertions | ||
// (which will be nil) to highlight there is no overwrite here: they are | ||
|
@@ -536,7 +595,7 @@ func (ds *Shard) setValue(ctx context.Context, hv *hashBits, key string, value * | |
// Have we loaded the child? Prefer that. | ||
schild := child.childer.child(0) | ||
if schild != nil { | ||
if schild.isValueNode() { | ||
if schild.IsValueNode() { | ||
ds.childer.set(schild, i) | ||
} | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
package internal | ||
|
||
var HAMTHashFunction func(val []byte) []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be unexported again, it's not used anywhere outside this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schomatis I pushed a commit for this