Skip to content

Commit

Permalink
feat: #7747 sealing: Adding conf variable for capping number of concu…
Browse files Browse the repository at this point in the history
…rrent unsealing jobs (#7884)

* adding the new variables- now time for logic

* putting parameters into right placeS

* adding unsealing throttle

* fixing linter issues

* removing one last thing...
  • Loading branch information
laudiacay authored Jan 13, 2022
1 parent 60fae3a commit da6752e
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 17 deletions.
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,14 @@
# env var: LOTUS_DAGSTORE_MAXCONCURRENTREADYFETCHES
#MaxConcurrentReadyFetches = 0

# The maximum amount of unseals that can be processed simultaneously
# from the storage subsystem. 0 means unlimited.
# Default value: 0 (unlimited).
#
# type: int
# env var: LOTUS_DAGSTORE_MAXCONCURRENTUNSEALS
#MaxConcurrentUnseals = 5

# The maximum number of simultaneous inflight API calls to the storage
# subsystem.
# Default value: 100.
Expand Down
38 changes: 26 additions & 12 deletions markets/dagstore/miner_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,28 @@ type SectorAccessor interface {
}

type minerAPI struct {
pieceStore piecestore.PieceStore
sa SectorAccessor
throttle throttle.Throttler
readyMgr *shared.ReadyManager
pieceStore piecestore.PieceStore
sa SectorAccessor
throttle throttle.Throttler
unsealThrottle throttle.Throttler
readyMgr *shared.ReadyManager
}

var _ MinerAPI = (*minerAPI)(nil)

func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int, unsealConcurrency int) MinerAPI {
var unsealThrottle throttle.Throttler
if unsealConcurrency == 0 {
unsealThrottle = throttle.Noop()
} else {
unsealThrottle = throttle.Fixed(unsealConcurrency)
}
return &minerAPI{
pieceStore: store,
sa: sa,
throttle: throttle.Fixed(concurrency),
readyMgr: shared.NewReadyManager(),
pieceStore: store,
sa: sa,
throttle: throttle.Fixed(concurrency),
unsealThrottle: unsealThrottle,
readyMgr: shared.NewReadyManager(),
}
}

Expand Down Expand Up @@ -152,13 +160,19 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mo
}

lastErr := xerrors.New("no sectors found to unseal from")

// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
var reader mount.Reader
deal := deal
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})

if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
Expand Down
6 changes: 3 additions & 3 deletions markets/dagstore/miner_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
rpn := &mockRPN{
sectors: mockData,
}
api := NewMinerAPI(ps, rpn, 100)
api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx))

// Add deals to piece store
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {

ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewMinerAPI(ps, rpn, 100)
api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx))

// Add a deal with data Length 10
Expand All @@ -142,7 +142,7 @@ func TestThrottle(t *testing.T) {
unsealedSectorID: "foo",
},
}
api := NewMinerAPI(ps, rpn, 3)
api := NewMinerAPI(ps, rpn, 3, 5)
require.NoError(t, api.Start(ctx))

// Add a deal with data Length 10
Expand Down
2 changes: 1 addition & 1 deletion markets/dagstore/wrapper_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestShardRegistration(t *testing.T) {
cfg := config.DefaultStorageMiner().DAGStore
cfg.RootDir = t.TempDir()

mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10)
mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10, 5)
dagst, w, err := NewDAGStore(cfg, mapi)
require.NoError(t, err)
require.NotNil(t, dagst)
Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func DefaultStorageMiner() *StorageMiner {
DAGStore: DAGStoreConfig{
MaxConcurrentIndex: 5,
MaxConcurrencyStorageCalls: 100,
MaxConcurrentUnseals: 5,
GCInterval: Duration(1 * time.Minute),
},
}
Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type DAGStoreConfig struct {
// Default value: 0 (unlimited).
MaxConcurrentReadyFetches int

// The maximum amount of unseals that can be processed simultaneously
// from the storage subsystem. 0 means unlimited.
// Default value: 0 (unlimited).
MaxConcurrentUnseals int

// The maximum number of simultaneous inflight API calls to the storage
// subsystem.
// Default value: 100.
Expand Down
2 changes: 1 addition & 1 deletion node/modules/storageminer_dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderP
}
}

mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls)
mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
Expand Down

0 comments on commit da6752e

Please sign in to comment.