Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #39 from ipfs/features/streaming-ls-5600
Browse files Browse the repository at this point in the history
feat(Directory): Add EnumLinksAsync method
  • Loading branch information
hannahhoward authored Oct 30, 2018
2 parents e8af7a6 + c54d0e4 commit a3eae7f
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 41 deletions.
65 changes: 46 additions & 19 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"context"
"fmt"
"os"
"sync"

bitfield "github.com/Stebalien/go-bitfield"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func
// EnumLinks collects all links in the Shard.
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
var links []*ipld.Link
var setlk sync.Mutex

getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *Shard) error {
lnk := sv.val
lnk.Name = sv.key
setlk.Lock()
links = append(links, lnk)
setlk.Unlock()
return nil
})

cset := cid.NewSet()
linkResults := ds.EnumLinksAsync(ctx)

err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
return links, err
for linkResult := range linkResults {
if linkResult.Err != nil {
return links, linkResult.Err
}
links = append(links, linkResult.Link)
}
return links, nil
}

// ForEachLink walks the Shard and calls the given function.
Expand All @@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
})
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
}()
return linkResults
}

// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *Shard) error) dag.GetLinks {
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {

return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
Expand Down Expand Up @@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *
if err != nil {
return nil, err
}
err = onShardValue(sv)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
}
}
return childShards, nil
}
}

func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
// make sure that context cancel is processed first
// the reason is due to the concurrency of EnumerateChildrenAsync
// it's possible for EnumLinksAsync to complete and close the linkResults
// channel before this code runs
select {
case <-ctx.Done():
return
default:
}
select {
case linkResults <- r:
case <-ctx.Done():
}
}

func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
for idx := range ds.children {
c, err := ds.getChild(ctx, idx)
Expand Down
89 changes: 67 additions & 22 deletions hamt/hamt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error {
}
}

func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}

linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}

linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}
func assertLinksEqual(linksA []*ipld.Link, linksB []*ipld.Link) error {

if len(linksA) != len(linksB) {
return fmt.Errorf("links arrays are different sizes")
Expand All @@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
return nil
}

func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}

linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}

linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}

return assertLinksEqual(linksA, linksB)
}

func TestBasicSet(t *testing.T) {
ds := mdtest.Mock()
for _, w := range []int{128, 256, 512, 1024, 2048, 4096} {
Expand Down Expand Up @@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) {
}
}

func TestEnumLinksAsync(t *testing.T) {
ds := mdtest.Mock()
_, s, err := makeDir(ds, 300)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()

nd, err := s.Node()
if err != nil {
t.Fatal(err)
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}

linksA, err := nds.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
}

linkResults := nds.EnumLinksAsync(ctx)

var linksB []*ipld.Link

for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
linksB = append(linksB, linkResult.Link)
}

err = assertLinksEqual(linksA, linksB)
if err != nil {
t.Fatal(err)
}
}

func TestDuplicateAddShard(t *testing.T) {
ds := mdtest.Mock()
dir, _ := NewShard(ds, 256)
Expand Down
31 changes: 31 additions & 0 deletions io/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

mdag "github.com/ipfs/go-merkledag"

format "github.com/ipfs/go-unixfs"
hamt "github.com/ipfs/go-unixfs/hamt"

Expand Down Expand Up @@ -38,6 +39,10 @@ type Directory interface {
// ForEachLink applies the given function to Links in the directory.
ForEachLink(context.Context, func(*ipld.Link) error) error

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
EnumLinksAsync(context.Context) <-chan format.LinkResult

// Links returns the all the links in the directory node.
Links(context.Context) ([]*ipld.Link, error)

Expand Down Expand Up @@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No
return d.node.AddNodeLink(name, node)
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
go func() {
defer close(linkResults)
for _, l := range d.node.Links() {
select {
case linkResults <- format.LinkResult{
Link: l,
Err: nil,
}:
case <-ctx.Done():
return
}
}
}()
return linkResults
}

// ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
for _, l := range d.node.Links() {
Expand Down Expand Up @@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro
return d.shard.ForEachLink(ctx, f)
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
return d.shard.EnumLinksAsync(ctx)
}

// Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx)
Expand Down
26 changes: 26 additions & 0 deletions io/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"testing"

ipld "github.com/ipfs/go-ipld-format"
mdtest "github.com/ipfs/go-merkledag/test"

ft "github.com/ipfs/go-unixfs"
)

Expand Down Expand Up @@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) {
if len(links) != count {
t.Fatal("wrong number of links", len(links), count)
}

linkResults := dir.EnumLinksAsync(ctx)

asyncNames := make(map[string]bool)
var asyncLinks []*ipld.Link

for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
asyncNames[linkResult.Link.Name] = true
asyncLinks = append(asyncLinks, linkResult.Link)
}

for i := 0; i < count; i++ {
n := fmt.Sprintf("entry %d", i)
if !asyncNames[n] {
t.Fatal("COULDNT FIND: ", n)
}
}

if len(asyncLinks) != count {
t.Fatal("wrong number of links", len(asyncLinks), count)
}
}
9 changes: 9 additions & 0 deletions unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ import (
proto "github.com/gogo/protobuf/proto"

dag "github.com/ipfs/go-merkledag"

ipld "github.com/ipfs/go-ipld-format"
pb "github.com/ipfs/go-unixfs/pb"
)

// A LinkResult for any parallel enumeration of links
// TODO: Should this live in go-ipld-format?
type LinkResult struct {
Link *ipld.Link
Err error
}

// Shorthands for protobuffer types
const (
TRaw = pb.Data_Raw
Expand Down

0 comments on commit a3eae7f

Please sign in to comment.