Skip to content

Commit

Permalink
Merge pull request #8715 from filecoin-project/feat/miner-commp-cmd
Browse files Browse the repository at this point in the history
feat: miner cli: sealing data-cid command
  • Loading branch information
magik6k authored May 24, 2022
2 parents 4b1bfa9 + b53db68 commit 6f2c8d6
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 8 deletions.
99 changes: 99 additions & 0 deletions cmd/lotus-miner/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"os"
"sort"
"strings"
"text/tabwriter"
"time"

"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/httpreader"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
Expand All @@ -31,6 +38,7 @@ var sealingCmd = &cli.Command{
workersCmd(true),
sealingSchedDiagCmd,
sealingAbortCmd,
sealingDataCidCmd,
},
}

Expand Down Expand Up @@ -349,3 +357,94 @@ var sealingAbortCmd = &cli.Command{
return nodeApi.SealingAbort(ctx, job.ID)
},
}

var sealingDataCidCmd = &cli.Command{
Name: "data-cid",
Usage: "Compute data CID using workers",
ArgsUsage: "[file/url] <padded piece size>",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "file-size",
Usage: "real file size",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 || cctx.Args().Len() > 2 {
return xerrors.Errorf("expected 1 or 2 arguments")
}

nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

var r io.Reader
sz := cctx.Uint64("file-size")

if strings.HasPrefix(cctx.Args().First(), "http://") || strings.HasPrefix(cctx.Args().First(), "https://") {
r = &httpreader.HttpReader{
URL: cctx.Args().First(),
}

if !cctx.IsSet("file-size") {
resp, err := http.Head(cctx.Args().First())
if err != nil {
return xerrors.Errorf("http head: %w", err)
}

if resp.ContentLength < 0 {
return xerrors.Errorf("head response didn't contain content length; specify --file-size")
}
sz = uint64(resp.ContentLength)
}
} else {
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}

f, err := os.OpenFile(p, os.O_RDONLY, 0)
if err != nil {
return xerrors.Errorf("opening source file: %w", err)
}

if !cctx.IsSet("file-size") {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat: %w", err)
}
sz = uint64(st.Size())
}

r = f
}

var psize abi.PaddedPieceSize
if cctx.Args().Len() == 2 {
rps, err := humanize.ParseBytes(cctx.Args().Get(1))
if err != nil {
return xerrors.Errorf("parsing piece size: %w", err)
}
psize = abi.PaddedPieceSize(rps)
if err := psize.Validate(); err != nil {
return xerrors.Errorf("checking piece size: %w", err)
}
if sz > uint64(psize.Unpadded()) {
return xerrors.Errorf("file larger than the piece")
}
} else {
psize = padreader.PaddedSize(sz).Padded()
}

pc, err := nodeApi.ComputeDataCid(ctx, psize.Unpadded(), r)
if err != nil {
return xerrors.Errorf("computing data CID: %w", err)
}

fmt.Println(pc.PieceCID, " ", pc.Size)
return nil
},
}
15 changes: 15 additions & 0 deletions documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,7 @@ COMMANDS:
workers list workers
sched-diag Dump internal scheduler state
abort Abort a running job
data-cid Compute data CID using workers
help, h Shows a list of commands or help for one command
OPTIONS:
Expand Down Expand Up @@ -2393,3 +2394,17 @@ OPTIONS:
--help, -h show help (default: false)
```

### lotus-miner sealing data-cid
```
NAME:
lotus-miner sealing data-cid - Compute data CID using workers
USAGE:
lotus-miner sealing data-cid [command options] [file/url] <padded piece size>
OPTIONS:
--file-size value real file size (default: 0)
--help, -h show help (default: false)
```
7 changes: 7 additions & 0 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/nullreader"
)

var _ Storage = &Sealer{}
Expand All @@ -53,6 +54,11 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
}

func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
pieceData = io.LimitReader(io.MultiReader(
pieceData,
nullreader.Reader{},
), int64(pieceSize))

// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
Expand All @@ -73,6 +79,7 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
for {
var read int
for rbuf := buf; len(rbuf) > 0; {

n, err := pieceData.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
Expand Down
20 changes: 13 additions & 7 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,23 @@ func TestWorkerDataCid(t *testing.T) {
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
/*
pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())
*/

pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())

bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded()
pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String())

nonFullPiece := abi.PaddedPieceSize(10 << 20).Unpadded()
pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(nonFullPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqbxib4pdxs5cqdn3fmtj4rcxk6rx6ztiqmrx7fcpo3ymuxbp2rodi", pi.PieceCID.String())
}

func TestWinningPostWorker(t *testing.T) {
Expand Down
47 changes: 47 additions & 0 deletions lib/httpreader/httpreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package httpreader

import (
"io"
"net/http"

"golang.org/x/xerrors"
)

// HttpReader is a reader which will read a http resource with a simple get request.
// Before first Read it will be passed over JsonRPC as a URL.
type HttpReader struct {
URL string

reader io.ReadCloser
}

func (h *HttpReader) Close() error {
h.URL = ""
if h.reader != nil {
return h.reader.Close()
}
return nil
}

func (h *HttpReader) Read(p []byte) (n int, err error) {
if h.reader == nil {
res, err := http.Get(h.URL)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode)
}

// mark the reader as reading
h.URL = ""
h.reader = res.Body
}
if h.reader == nil {
return 0, xerrors.Errorf("http reader closed")
}

return h.reader.Read(p)
}

var _ io.ReadCloser = &HttpReader{}
10 changes: 9 additions & 1 deletion lib/rpcenc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/httpreader"
)

var log = logging.Logger("rpcenc")
Expand All @@ -34,6 +35,7 @@ type StreamType string
const (
Null StreamType = "null"
PushStream StreamType = "push"
HTTP StreamType = "http"
// TODO: Data transfer handoff to workers?
)

Expand Down Expand Up @@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
if r, ok := r.(*nullreader.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" {
return reflect.ValueOf(ReaderStream{Type: HTTP, Info: r.URL}), nil
}

reqID := uuid.New()
u, err := url.Parse(addr)
Expand Down Expand Up @@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
}

if rs.Type == Null {
switch rs.Type {
case Null:
n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}

return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil
case HTTP:
return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil
}

u, err := uuid.Parse(rs.Info)
Expand Down

0 comments on commit 6f2c8d6

Please sign in to comment.