Skip to content

Commit

Permalink
feat(commands): --stream option for ls
Browse files Browse the repository at this point in the history
Convert LS Command to use current cmds lib
Update LS Command to support streaming
Rebase fixes

License: MIT
Signed-off-by: hannahhoward <hannah@hannahhoward.net>
  • Loading branch information
hannahhoward committed Oct 30, 2018
1 parent b6e1d38 commit 3b19e7e
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 107 deletions.
287 changes: 182 additions & 105 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package commands

import (
"bytes"
"fmt"
"io"
"text/tabwriter"

cmds "github.com/ipfs/go-ipfs/commands"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
merkledag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag"
offline "gx/ipfs/QmT6dHGp3UYd3vUMpy7rzX2CXQv7HLcj42Vtq8qwwjgASb/go-ipfs-exchange-offline"
blockservice "gx/ipfs/QmWfhv1D18DRSiSm73r4QGcByspzPtxxRTcmHW3axFXZo8/go-blockservice"
Expand All @@ -21,24 +21,34 @@ import (
unixfspb "gx/ipfs/QmfB3oNXGGq9S4B2a9YeCajoATms3Zw2VvDm8fK7VeLSV8/go-unixfs/pb"
)

// LsLink contains printable data for a single ipld link in ls output
type LsLink struct {
Name, Hash string
Size uint64
Type unixfspb.Data_DataType
}

// LsObject is an element of LsOutput
// It can represent a whole directory, a directory header, one or more links,
// Or a the end of a directory
type LsObject struct {
Hash string
Links []LsLink
Hash string
Links []LsLink
HasHeader bool
HasLinks bool
HasFooter bool
}

// LsObject is a set of printable data for directories
type LsOutput struct {
Objects []LsObject
MultipleFolders bool
Objects []LsObject
}

const (
lsHeadersOptionNameTime = "headers"
lsResolveTypeOptionName = "resolve-type"
lsStreamOptionName = "stream"
)

var LsCmd = &cmds.Command{
Expand All @@ -60,158 +70,225 @@ The JSON output contains type information.
Options: []cmdkit.Option{
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmdkit.BoolOption(lsStreamOptionName, "s", "Stream directory entries as they are found."),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// get options early -> exit early in case of error
if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

resolve, _, err := req.Option(lsResolveTypeOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
dserv := nd.DAG
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
dserv = merkledag.NewDAGService(bserv)
}

paths := req.Arguments()
err = req.ParseBodyArgs()
if err != nil {
return err
}

paths := req.Arguments

var dagnodes []ipld.Node
for _, fpath := range paths {
p, err := iface.ParsePath(fpath)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

dagnode, err := api.ResolveNode(req.Context(), p)
dagnode, err := api.ResolveNode(req.Context, p)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
dagnodes = append(dagnodes, dagnode)
}

output := make([]LsObject, len(req.Arguments()))
ng := merkledag.NewSession(req.Context(), nd.DAG)
ng := merkledag.NewSession(req.Context, nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng)

stream, _ := req.Options[lsStreamOptionName].(bool)
multipleFolders := len(req.Arguments) > 1
if !stream {
output := make([]LsObject, len(req.Arguments))

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
if dir == nil {
links = dagnode.Links()
} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
}
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
outputLinks[j] = *lsLink
}
output[i] = newFullDirectoryLsObject(paths[i], outputLinks)
}

return cmds.EmitOnce(res, &LsOutput{multipleFolders, output})
}

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal)
return
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
var linkResults <-chan unixfs.LinkResult
if dir == nil {
links = dagnode.Links()
linkResults = makeDagNodeLinkResults(req, dagnode)
} else {
links, err = dir.Links(req.Context())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
linkResults = dir.EnumLinksAsync(req.Context)
}

output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(links)),
}

for j, link := range links {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context(), dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
output := make([]LsObject, 1)
outputLinks := make([]LsLink, 1)

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
t = d.Type()
}
output[0] = newDirectoryHeaderLsObject(paths[i])
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return nil
}
for linkResult := range linkResults {
if linkResult.Err != nil {
return linkResult.Err
}
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
output[i].Links[j] = LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
outputLinks[0] = *lsLink
output[0] = newDirectoryLinksLsObject(outputLinks)
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return err
}
}
output[0] = newDirectoryFooterLsObject()
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return err
}
}

res.SetOutput(&LsOutput{output})
return nil
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {

v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool()
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
output, ok := v.(*LsOutput)
if !ok {
return nil, e.TypeErr(output, v)
return e.TypeErr(output, v)
}

buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, object := range output.Objects {
if len(output.Objects) > 1 {
fmt.Fprintf(w, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(w, "Hash\tSize\tName")
if object.HasHeader {
if output.MultipleFolders {
fmt.Fprintf(tw, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(tw, "Hash\tSize\tName")
}
}
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
if object.HasLinks {
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
}

fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
if len(output.Objects) > 1 {
fmt.Fprintln(w)
if object.HasFooter {
if output.MultipleFolders {
fmt.Fprintln(tw)
}
}
}
w.Flush()

return buf, nil
},
tw.Flush()
return nil
}),
},
Type: LsOutput{},
}

func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
linkResults := make(chan unixfs.LinkResult)
go func() {
defer close(linkResults)
for _, l := range dagnode.Links() {
select {
case linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}:
case <-req.Context.Done():
return
}
}
}()
return linkResults
}

func newFullDirectoryLsObject(hash string, links []LsLink) LsObject {
return LsObject{hash, links, true, true, true}
}
func newDirectoryHeaderLsObject(hash string) LsObject {
return LsObject{hash, nil, true, false, false}
}
func newDirectoryLinksLsObject(links []LsLink) LsObject {
return LsObject{"", links, false, true, false}
}
func newDirectoryFooterLsObject() LsObject {
return LsObject{"", nil, false, false, true}
}

func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
t = d.Type()
}
}
return &LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
}, nil
}
Loading

0 comments on commit 3b19e7e

Please sign in to comment.