From ebd34f1884bd0846203e6a64ccbbaaba52dc2a11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 28 Mar 2022 21:19:11 -0400 Subject: [PATCH 1/3] feat: storage: Parallel proving checks --- cmd/lotus-miner/init.go | 2 +- .../en/default-lotus-miner-config.toml | 24 ++++++ extern/sector-storage/faults.go | 71 +++++++++++----- extern/sector-storage/manager.go | 11 ++- extern/sector-storage/piece_provider_test.go | 6 +- itests/kit/ensemble.go | 4 +- node/builder_miner.go | 2 +- node/config/def.go | 2 +- node/config/doc_gen.go | 80 ++++++++++++++++++- node/config/storage.go | 18 +++++ node/config/types.go | 32 +++++++- node/modules/storageminer.go | 4 +- 12 files changed, 218 insertions(+), 38 deletions(-) diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index 9582519fd0a..7d81026f2af 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -467,7 +467,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode } stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{}) - smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ + smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.Config{ ParallelFetchLimit: 10, AllowAddPiece: true, AllowPreCommit1: true, diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index efd03a94c3d..8dd29cc754b 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -306,6 +306,14 @@ #PurgeCacheOnStart = false +[Proving] + # Maximum number of sector checks to run in parallel. (0 = unlimited) + # + # type: int + # env var: LOTUS_PROVING_PARALLELCHECKLIMIT + #ParallelCheckLimit = 0 + + [Sealing] # Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. # If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created. @@ -484,33 +492,49 @@ [Storage] + # type: int # env var: LOTUS_STORAGE_PARALLELFETCHLIMIT #ParallelFetchLimit = 10 + # Local worker config + # + # type: bool # env var: LOTUS_STORAGE_ALLOWADDPIECE #AllowAddPiece = true + # type: bool # env var: LOTUS_STORAGE_ALLOWPRECOMMIT1 #AllowPreCommit1 = true + # type: bool # env var: LOTUS_STORAGE_ALLOWPRECOMMIT2 #AllowPreCommit2 = true + # type: bool # env var: LOTUS_STORAGE_ALLOWCOMMIT #AllowCommit = true + # type: bool # env var: LOTUS_STORAGE_ALLOWUNSEAL #AllowUnseal = true + # type: bool # env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE #AllowReplicaUpdate = true + # type: bool # env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2 #AllowProveReplicaUpdate2 = true + # type: bool # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY #AllowRegenSectorKey = true + # ResourceFiltering instructs the system which resource filtering strategy + # to use when evaluating tasks against this worker. An empty value defaults + # to "hardware". + # + # type: sectorstorage.ResourceFilteringStrategy # env var: LOTUS_STORAGE_RESOURCEFILTERING #ResourceFiltering = "hardware" diff --git a/extern/sector-storage/faults.go b/extern/sector-storage/faults.go index 1aed55a97c9..f0ebdc99c9e 100644 --- a/extern/sector-storage/faults.go +++ b/extern/sector-storage/faults.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "sync" "time" "golang.org/x/xerrors" @@ -24,22 +25,55 @@ type FaultTracker interface { // CheckProvable returns unprovable sectors func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if rg == nil { return nil, xerrors.Errorf("rg is nil") } var bad = make(map[abi.SectorID]string) + var badLk sync.Mutex + + var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength) + _, _ = rand.Read(postRand) + postRand[31] &= 0x3f + + limit := m.parallelCheckLimit + if limit <= 0 { + limit = len(sectors) + } + throttle := make(chan struct{}, limit) + + addBad := func(s abi.SectorID, reason string) { + badLk.Lock() + bad[s] = reason + badLk.Unlock() + } + + var wg sync.WaitGroup + wg.Add(len(sectors)) for _, sector := range sectors { - err := func() error { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + + go func(sector storage.SectorRef) { + defer wg.Done() + defer func() { + <-throttle + }() ctx, cancel := context.WithCancel(ctx) defer cancel() commr, update, err := rg(ctx, sector.ID) if err != nil { log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err) - bad[sector.ID] = fmt.Sprintf("getting commR: %s", err) - return nil + addBad(sector.ID, fmt.Sprintf("getting commR: %s", err)) + return } toLock := storiface.FTSealed | storiface.FTCache @@ -49,31 +83,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone) if err != nil { - return xerrors.Errorf("acquiring sector lock: %w", err) + addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err)) + return } if !locked { log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector) - bad[sector.ID] = fmt.Sprint("can't acquire read lock") - return nil + addBad(sector.ID, fmt.Sprint("can't acquire read lock")) + return } wpp, err := sector.ProofType.RegisteredWindowPoStProof() if err != nil { - return err + addBad(sector.ID, fmt.Sprint("can't get proof type")) + return } - var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength) - _, _ = rand.Read(pr) - pr[31] &= 0x3f - - ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{ + ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, postRand, []abi.SectorNumber{ sector.ID.Number, }) if err != nil { log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err) - bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err) - return nil + addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err)) + return } vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout) @@ -88,15 +120,10 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, }, wpp) if err != nil { log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err) - bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err) - return nil + addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err)) + return } - - return nil - }() - if err != nil { - return nil, err - } + }(sector) } return bad, nil diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 0e0387f575e..f966b9b6ba5 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -70,6 +70,8 @@ type Manager struct { workLk sync.Mutex work *statestore.StateStore + parallelCheckLimit int + callToWork map[storiface.CallID]WorkID // used when we get an early return and there's no callToWork mapping callRes map[storiface.CallID]chan result @@ -99,7 +101,7 @@ const ( ResourceFilteringDisabled = ResourceFilteringStrategy("disabled") ) -type SealerConfig struct { +type Config struct { ParallelFetchLimit int // Local worker config @@ -116,6 +118,9 @@ type SealerConfig struct { // to use when evaluating tasks against this worker. An empty value defaults // to "hardware". ResourceFiltering ResourceFilteringStrategy + + // PoSt config + ParallelCheckLimit int } type StorageAuth http.Header @@ -123,7 +128,7 @@ type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { +func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) @@ -142,6 +147,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores. localProver: prover, + parallelCheckLimit: sc.ParallelCheckLimit, + work: mss, callToWork: map[storiface.CallID]WorkID{}, callRes: map[storiface.CallID]chan result{}, diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 3ace2916ec6..a529a728928 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -30,7 +30,7 @@ import ( // only uses miner and does NOT use any remote worker. func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // Set up sector storage manager - sealerCfg := SealerConfig{ + sealerCfg := Config{ ParallelFetchLimit: 10, AllowAddPiece: true, AllowPreCommit1: true, @@ -89,7 +89,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) { logging.SetAllLoggers(logging.LevelDebug) // miner's worker can only add pieces to an unsealed sector. - sealerCfg := SealerConfig{ + sealerCfg := Config{ ParallelFetchLimit: 10, AllowAddPiece: true, AllowPreCommit1: false, @@ -198,7 +198,7 @@ func generatePieceData(size uint64) []byte { return bz } -func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness { +func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness { ctx := context.Background() // listen on tcp socket to create an http server later address := "0.0.0.0:0" diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index d40844e8457..8b0128f46ef 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -585,7 +585,7 @@ func (n *Ensemble) Start() *Ensemble { // disable resource filtering so that local worker gets assigned tasks // regardless of system pressure. - node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig { + node.Override(new(sectorstorage.Config), func() sectorstorage.Config { scfg := config.DefaultStorageMiner() if noLocal { @@ -596,7 +596,7 @@ func (n *Ensemble) Start() *Ensemble { } scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled - return scfg.Storage + return scfg.StorageManager() }), // upgrades diff --git a/node/builder_miner.go b/node/builder_miner.go index d260184524c..6e19d36e409 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -214,7 +214,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)), ), - Override(new(sectorstorage.SealerConfig), cfg.Storage), + Override(new(sectorstorage.Config), cfg.StorageManager()), Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)), ) } diff --git a/node/config/def.go b/node/config/def.go index 94b4c0390ae..29374bd548e 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -138,7 +138,7 @@ func DefaultStorageMiner() *StorageMiner { TerminateBatchWait: Duration(5 * time.Minute), }, - Storage: sectorstorage.SealerConfig{ + Storage: SealerConfig{ AllowAddPiece: true, AllowPreCommit1: true, AllowPreCommit2: true, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index c7d339c92eb..c86cf975abb 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -620,6 +620,14 @@ over the worker address if this flag is set.`, Comment: ``, }, }, + "ProvingConfig": []DocField{ + { + Name: "ParallelCheckLimit", + Type: "int", + + Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`, + }, + }, "Pubsub": []DocField{ { Name: "Bootstrapper", @@ -691,6 +699,70 @@ default value is true`, This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".`, }, }, + "SealerConfig": []DocField{ + { + Name: "ParallelFetchLimit", + Type: "int", + + Comment: ``, + }, + { + Name: "AllowAddPiece", + Type: "bool", + + Comment: `Local worker config`, + }, + { + Name: "AllowPreCommit1", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowPreCommit2", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowCommit", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowUnseal", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowReplicaUpdate", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowProveReplicaUpdate2", + Type: "bool", + + Comment: ``, + }, + { + Name: "AllowRegenSectorKey", + Type: "bool", + + Comment: ``, + }, + { + Name: "ResourceFiltering", + Type: "sectorstorage.ResourceFilteringStrategy", + + Comment: `ResourceFiltering instructs the system which resource filtering strategy +to use when evaluating tasks against this worker. An empty value defaults +to "hardware".`, + }, + }, "SealingConfig": []DocField{ { Name: "MaxWaitDealsSectors", @@ -933,6 +1005,12 @@ Default is 20 (about once a week).`, Comment: ``, }, + { + Name: "Proving", + Type: "ProvingConfig", + + Comment: ``, + }, { Name: "Sealing", Type: "SealingConfig", @@ -941,7 +1019,7 @@ Default is 20 (about once a week).`, }, { Name: "Storage", - Type: "sectorstorage.SealerConfig", + Type: "SealerConfig", Comment: ``, }, diff --git a/node/config/storage.go b/node/config/storage.go index 68170ee2f6a..bf997add2db 100644 --- a/node/config/storage.go +++ b/node/config/storage.go @@ -8,6 +8,7 @@ import ( "golang.org/x/xerrors" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/stores" ) @@ -49,3 +50,20 @@ func WriteStorageFile(path string, config stores.StorageConfig) error { return nil } + +func (c *StorageMiner) StorageManager() sectorstorage.Config { + return sectorstorage.Config{ + ParallelFetchLimit: c.Storage.ParallelFetchLimit, + AllowAddPiece: c.Storage.AllowAddPiece, + AllowPreCommit1: c.Storage.AllowPreCommit1, + AllowPreCommit2: c.Storage.AllowPreCommit2, + AllowCommit: c.Storage.AllowCommit, + AllowUnseal: c.Storage.AllowUnseal, + AllowReplicaUpdate: c.Storage.AllowReplicaUpdate, + AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2, + AllowRegenSectorKey: c.Storage.AllowRegenSectorKey, + ResourceFiltering: c.Storage.ResourceFiltering, + + ParallelCheckLimit: c.Proving.ParallelCheckLimit, + } +} diff --git a/node/config/types.go b/node/config/types.go index 73fe7a61373..9d0d68dd3de 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -1,10 +1,9 @@ package config import ( - "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/ipfs/go-cid" ) // // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE @@ -53,8 +52,9 @@ type StorageMiner struct { Subsystems MinerSubsystemConfig Dealmaking DealmakingConfig IndexProvider IndexProviderConfig + Proving ProvingConfig Sealing SealingConfig - Storage sectorstorage.SealerConfig + Storage SealerConfig Fees MinerFeeConfig Addresses MinerAddressConfig DAGStore DAGStoreConfig @@ -216,6 +216,13 @@ type RetrievalPricingDefault struct { VerifiedDealsFreeTransfer bool } +type ProvingConfig struct { + // Maximum number of sector checks to run in parallel. (0 = unlimited) + ParallelCheckLimit int + + // todo disable builtin post +} + type SealingConfig struct { // Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. // If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created. @@ -307,6 +314,25 @@ type SealingConfig struct { // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above } +type SealerConfig struct { + ParallelFetchLimit int + + // Local worker config + AllowAddPiece bool + AllowPreCommit1 bool + AllowPreCommit2 bool + AllowCommit bool + AllowUnseal bool + AllowReplicaUpdate bool + AllowProveReplicaUpdate2 bool + AllowRegenSectorKey bool + + // ResourceFiltering instructs the system which resource filtering strategy + // to use when evaluating tasks against this worker. An empty value defaults + // to "hardware". + ResourceFiltering sectorstorage.ResourceFilteringStrategy +} + type BatchFeeConfig struct { Base types.FIL PerSector types.FIL diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a337bbbc4a6..ba3fd57e303 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -739,11 +739,11 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora return stores.NewLocal(ctx, ls, si, urls) } -func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { +func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.Config) *stores.Remote { return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{}) } -func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { +func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.Config, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { ctx := helpers.LifecycleCtx(mctx, lc) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) From d95f24b9d2638522c89bc01da9ef87ca8e845741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Mar 2022 13:22:58 -0400 Subject: [PATCH 2/3] config: Sane default parallel sector read defaults --- cmd/lotus-worker/main.go | 2 +- documentation/en/cli-lotus-worker.md | 2 +- documentation/en/default-lotus-miner-config.toml | 2 +- node/config/def.go | 4 ++++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 39de995a535..12c5f8dc880 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -196,7 +196,7 @@ var runCmd = &cli.Command{ &cli.IntFlag{ Name: "post-parallel-reads", Usage: "maximum number of parallel challenge reads (0 = no limit)", - Value: 0, + Value: 128, }, &cli.DurationFlag{ Name: "post-read-timeout", diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index b5551286551..52445093ab6 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -50,7 +50,7 @@ OPTIONS: --windowpost enable window post (default: false) --winningpost enable winning post (default: false) --parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5) - --post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 0) + --post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128) --post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s) --timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") --help, -h show help (default: false) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 8dd29cc754b..4ddac8e0bbf 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -311,7 +311,7 @@ # # type: int # env var: LOTUS_PROVING_PARALLELCHECKLIMIT - #ParallelCheckLimit = 0 + #ParallelCheckLimit = 128 [Sealing] diff --git a/node/config/def.go b/node/config/def.go index 29374bd548e..10e29c17657 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -138,6 +138,10 @@ func DefaultStorageMiner() *StorageMiner { TerminateBatchWait: Duration(5 * time.Minute), }, + Proving: ProvingConfig{ + ParallelCheckLimit: 128, + }, + Storage: SealerConfig{ AllowAddPiece: true, AllowPreCommit1: true, From 0710342317b0ba819ccfca95dc07eab5d26227b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Mar 2022 15:28:37 -0400 Subject: [PATCH 3/3] fix waiting in sector proving checks --- extern/sector-storage/faults.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extern/sector-storage/faults.go b/extern/sector-storage/faults.go index f0ebdc99c9e..04a94e58cdf 100644 --- a/extern/sector-storage/faults.go +++ b/extern/sector-storage/faults.go @@ -126,6 +126,8 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, }(sector) } + wg.Wait() + return bad, nil }