Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make small retrieval 200x faster #7693

Merged
merged 9 commits into from
Nov 29, 2021
6 changes: 3 additions & 3 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if offset != 0 {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}

return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
}

func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
Expand Down
43 changes: 28 additions & 15 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ type Unsealer interface {

type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
// pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

Expand Down Expand Up @@ -67,7 +71,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
Expand All @@ -78,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel()
Expand All @@ -97,20 +101,22 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)

if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil
}
if err != nil {
Expand All @@ -129,14 +135,14 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
if unsealed == cid.Undef {
commd = nil
}
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
if err := p.uns.SectorsUnsealPiece(ctx, sector, pieceOffset, size, ticket, commd); err != nil {
log.Errorf("failed to SectorsUnsealPiece: %s", err)
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}

log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
Expand All @@ -145,9 +151,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Errorf("got no reader after unsealing piece")
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("got a reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
} else {
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}

upr, err := fr32.NewUnpadReader(r, size.Padded())
Expand All @@ -156,10 +162,17 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}

log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}

return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
Reader: bir,
close: func() error {
err = r.Close()
unlock()
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)
Expand Down
33 changes: 21 additions & 12 deletions markets/dagstore/miner_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand All @@ -14,23 +15,31 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
)

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI

type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error
}

type SectorAccessor interface {
retrievalmarket.SectorAccessor

UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
}

type minerAPI struct {
pieceStore piecestore.PieceStore
sa retrievalmarket.SectorAccessor
sa SectorAccessor
throttle throttle.Throttler
readyMgr *shared.ReadyManager
}

var _ MinerAPI = (*minerAPI)(nil)

func NewMinerAPI(store piecestore.PieceStore, sa retrievalmarket.SectorAccessor, concurrency int) MinerAPI {
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
return &minerAPI{
pieceStore: store,
sa: sa,
Expand Down Expand Up @@ -91,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil
}

func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, err
return nil, 0, err
}

// Throttle this path to avoid flooding the storage subsystem.
Expand All @@ -105,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
})

if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}

if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}

// prefer an unsealed sector containing the piece if one exists
Expand All @@ -127,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
return nil
}
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
return err
})

Expand All @@ -138,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io

if reader != nil {
// we were able to obtain a reader for an already unsealed piece
return reader, nil
return reader, deal.Length.Unpadded(), nil
}
}

Expand All @@ -149,18 +158,18 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}

// Successfully fetched the deal data so return a reader over the data
return reader, nil
return reader, deal.Length.Unpadded(), nil
}

return nil, lastErr
return nil, 0, lastErr
}

func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
Expand Down
10 changes: 7 additions & 3 deletions markets/dagstore/miner_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
}

// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if tc.expectErr {
require.Error(t, err)
return
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
errgrp.Go(func() error {
r, err := api.FetchUnsealedPiece(ctx, cid1)
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if err == nil {
_ = r.Close()
}
Expand Down Expand Up @@ -203,6 +203,10 @@ type mockRPN struct {
}

func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
}

func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
atomic.AddInt32(&m.calls, 1)
m.lk.RLock()
defer m.lk.RUnlock()
Expand All @@ -211,7 +215,7 @@ func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, o
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
}

func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
Expand Down
Loading