Skip to content

Commit

Permalink
sector import: Sector data download
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 2, 2022
1 parent a0c736e commit 376614e
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 71 deletions.
104 changes: 104 additions & 0 deletions storage/paths/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package paths

import (
"context"
"io"
"mime"
"net/http"
"os"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)

func fetch(ctx context.Context, url, outname string, header http.Header) (rerr error) {
log.Infof("Fetch %s -> %s", url, outname)

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = header
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint

if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}

start := time.Now()
var bytes int64
defer func() {
took := time.Now().Sub(start)
mibps := float64(bytes) / 1024 / 1024 * float64(time.Second) / float64(took)
log.Infow("Fetch done", "url", url, "out", outname, "took", took.Round(time.Millisecond), "bytes", bytes, "MiB/s", mibps, "err", rerr)
}()

mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}

if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}

switch mediatype {
case "application/x-tar":
bytes, err = tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
return err
case "application/octet-stream":
f, err := os.Create(outname)
if err != nil {
return err
}
bytes, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
if err != nil {
f.Close() // nolint
return err
}
return f.Close()
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}

// FetchWithTemp fetches data into a temp 'fetching' directory, then moves the file to destination
func FetchWithTemp(ctx context.Context, urls []string, dest string, header http.Header) (string, error) {
var merr error
for _, url := range urls {
tempDest, err := tempFetchDest(dest, true)
if err != nil {
return "", err
}

if err := os.RemoveAll(dest); err != nil {
return "", xerrors.Errorf("removing dest: %w", err)
}

err = fetch(ctx, url, tempDest, header)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s -> %s: %w", url, tempDest, err))
continue
}

if err := move(tempDest, dest); err != nil {
return "", xerrors.Errorf("fetch move error %s -> %s: %w", tempDest, dest, err)
}

if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
}
return url, nil
}

return "", xerrors.Errorf("failed to fetch sector file (tried %v): %w", urls, merr)
}
67 changes: 5 additions & 62 deletions storage/paths/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"io"
"io/ioutil"
"math/bits"
"mime"
"net/http"
"net/url"
"os"
Expand All @@ -17,14 +18,10 @@ import (
"strings"
"sync"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)

var FetchTempSubdir = "fetching"
Expand Down Expand Up @@ -236,7 +233,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("removing dest: %w", err)
}

err = r.fetch(ctx, url, tempDest)
err = r.fetchThrottled(ctx, url, tempDest)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
continue
Expand All @@ -256,9 +253,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}

func (r *Remote) fetch(ctx context.Context, url, outname string) error {
log.Infof("Fetch %s -> %s", url, outname)

func (r *Remote) fetchThrottled(ctx context.Context, url, outname string) (rerr error) {
if len(r.limit) >= cap(r.limit) {
log.Infof("Throttling fetch, %d already running", len(r.limit))
}
Expand All @@ -274,59 +269,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
}

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint

if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}

/*bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()*/

mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}

if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}

switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
case "application/octet-stream":
f, err := os.Create(outname)
if err != nil {
return err
}
_, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
if err != nil {
f.Close() // nolint
return err
}
return f.Close()
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
return fetch(ctx, url, outname, r.auth)
}

func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
Expand Down
32 changes: 31 additions & 1 deletion storage/sealer/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/go-state-types/proof"

"github.com/filecoin-project/lotus/lib/nullreader"
spaths "github.com/filecoin-project/lotus/storage/paths"
nr "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/fr32"
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
Expand Down Expand Up @@ -1118,7 +1119,36 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error
}

func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
panic("todo")
var todo storiface.SectorFileType
for fileType := range src {
todo |= fileType
}

ptype := storiface.PathSealing
if finalized {
ptype = storiface.PathStorage
}

paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, todo, ptype)
if err != nil {
return xerrors.Errorf("failed to acquire sector paths: %w", err)
}
defer done()

for fileType, data := range src {
out := storiface.PathByType(paths, fileType)

if data.Local {
return xerrors.Errorf("sector(%v) with local data (%#v) requested in DownloadSectorData", sector, data)
}

_, err := spaths.FetchWithTemp(ctx, []string{data.URL}, out, data.Headers)
if err != nil {
return xerrors.Errorf("downloading sector data: %w", err)
}
}

return nil
}

func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
Expand Down
20 changes: 12 additions & 8 deletions storage/sealer/tarutil/systar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ import (

var log = logging.Logger("tarutil") // nolint

func ExtractTar(body io.Reader, dir string, buf []byte) error {
func ExtractTar(body io.Reader, dir string, buf []byte) (int64, error) {
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
return xerrors.Errorf("mkdir: %w", err)
return 0, xerrors.Errorf("mkdir: %w", err)
}

tr := tar.NewReader(body)
var read int64
for {
header, err := tr.Next()
switch err {
default:
return err
return read, err
case io.EOF:
return nil
return read, nil

case nil:
}
Expand All @@ -34,17 +35,20 @@ func ExtractTar(body io.Reader, dir string, buf []byte) error {
f, err := os.Create(filepath.Join(dir, header.Name))
if err != nil {
//nolint:gosec
return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
return read, xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
}

// This data is coming from a trusted source, no need to check the size.
// TODO: now it's actually not coming from a trusted source, check size / paths
//nolint:gosec
if _, err := io.CopyBuffer(f, tr, buf); err != nil {
return err
r, err := io.CopyBuffer(f, tr, buf)
read += r
if err != nil {
return read, err
}

if err := f.Close(); err != nil {
return err
return read, err
}
}
}
Expand Down

0 comments on commit 376614e

Please sign in to comment.