diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 55ddbb0543e..47ac9f1e7ed 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -550,6 +550,14 @@ # env var: LOTUS_DAGSTORE_MAXCONCURRENTREADYFETCHES #MaxConcurrentReadyFetches = 0 + # The maximum amount of unseals that can be processed simultaneously + # from the storage subsystem. 0 means unlimited. + # Default value: 0 (unlimited). + # + # type: int + # env var: LOTUS_DAGSTORE_MAXCONCURRENTUNSEALS + #MaxConcurrentUnseals = 5 + # The maximum number of simultaneous inflight API calls to the storage # subsystem. # Default value: 100. diff --git a/markets/dagstore/miner_api.go b/markets/dagstore/miner_api.go index 77b4b97bf4e..8a12097d5f2 100644 --- a/markets/dagstore/miner_api.go +++ b/markets/dagstore/miner_api.go @@ -31,20 +31,28 @@ type SectorAccessor interface { } type minerAPI struct { - pieceStore piecestore.PieceStore - sa SectorAccessor - throttle throttle.Throttler - readyMgr *shared.ReadyManager + pieceStore piecestore.PieceStore + sa SectorAccessor + throttle throttle.Throttler + unsealThrottle throttle.Throttler + readyMgr *shared.ReadyManager } var _ MinerAPI = (*minerAPI)(nil) -func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI { +func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int, unsealConcurrency int) MinerAPI { + var unsealThrottle throttle.Throttler + if unsealConcurrency == 0 { + unsealThrottle = throttle.Noop() + } else { + unsealThrottle = throttle.Fixed(unsealConcurrency) + } return &minerAPI{ - pieceStore: store, - sa: sa, - throttle: throttle.Fixed(concurrency), - readyMgr: shared.NewReadyManager(), + pieceStore: store, + sa: sa, + throttle: throttle.Fixed(concurrency), + unsealThrottle: unsealThrottle, + readyMgr: shared.NewReadyManager(), } } @@ -152,13 +160,19 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mo } lastErr := xerrors.New("no sectors found to unseal from") + // if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal. for _, deal := range pieceInfo.Deals { // Note that if the deal data is not already unsealed, unsealing may // block for a long time with the current PoRep - // - // This path is unthrottled. - reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + var reader mount.Reader + deal := deal + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. + reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + return err + }) + if err != nil { lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) log.Warn(lastErr.Error()) diff --git a/markets/dagstore/miner_api_test.go b/markets/dagstore/miner_api_test.go index 45cbf24610d..ee2f0cdce85 100644 --- a/markets/dagstore/miner_api_test.go +++ b/markets/dagstore/miner_api_test.go @@ -75,7 +75,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { rpn := &mockRPN{ sectors: mockData, } - api := NewMinerAPI(ps, rpn, 100) + api := NewMinerAPI(ps, rpn, 100, 5) require.NoError(t, api.Start(ctx)) // Add deals to piece store @@ -115,7 +115,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { ps := getPieceStore(t) rpn := &mockRPN{} - api := NewMinerAPI(ps, rpn, 100) + api := NewMinerAPI(ps, rpn, 100, 5) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -142,7 +142,7 @@ func TestThrottle(t *testing.T) { unsealedSectorID: "foo", }, } - api := NewMinerAPI(ps, rpn, 3) + api := NewMinerAPI(ps, rpn, 3, 5) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go index e46f8779beb..437032da943 100644 --- a/markets/dagstore/wrapper_migration_test.go +++ b/markets/dagstore/wrapper_migration_test.go @@ -96,7 +96,7 @@ func TestShardRegistration(t *testing.T) { cfg := config.DefaultStorageMiner().DAGStore cfg.RootDir = t.TempDir() - mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10) + mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10, 5) dagst, w, err := NewDAGStore(cfg, mapi) require.NoError(t, err) require.NotNil(t, dagst) diff --git a/node/config/def.go b/node/config/def.go index 9c39c197ce5..644c28bea69 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -216,6 +216,7 @@ func DefaultStorageMiner() *StorageMiner { DAGStore: DAGStoreConfig{ MaxConcurrentIndex: 5, MaxConcurrencyStorageCalls: 100, + MaxConcurrentUnseals: 5, GCInterval: Duration(1 * time.Minute), }, } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index eded0b1fed0..c3730cbace6 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -162,6 +162,14 @@ Default value: 5.`, Comment: `The maximum amount of unsealed deals that can be fetched simultaneously from the storage subsystem. 0 means unlimited. +Default value: 0 (unlimited).`, + }, + { + Name: "MaxConcurrentUnseals", + Type: "int", + + Comment: `The maximum amount of unseals that can be processed simultaneously +from the storage subsystem. 0 means unlimited. Default value: 0 (unlimited).`, }, { diff --git a/node/config/types.go b/node/config/types.go index 2ae2d8eee32..715f4824861 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -75,6 +75,11 @@ type DAGStoreConfig struct { // Default value: 0 (unlimited). MaxConcurrentReadyFetches int + // The maximum amount of unseals that can be processed simultaneously + // from the storage subsystem. 0 means unlimited. + // Default value: 0 (unlimited). + MaxConcurrentUnseals int + // The maximum number of simultaneous inflight API calls to the storage // subsystem. // Default value: 100. diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index b4f5d353554..513acaad110 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -38,7 +38,7 @@ func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderP } } - mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls) + mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals) ready := make(chan error, 1) pieceStore.OnReady(func(err error) { ready <- err