Skip to content

Commit

Permalink
Merge pull request #8391 from filecoin-project/feat/parallel-fault-check
Browse files Browse the repository at this point in the history
feat: storage: Parallel proving checks
  • Loading branch information
magik6k authored Mar 29, 2022
2 parents c456836 + 0710342 commit 3ce467a
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})

smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var runCmd = &cli.Command{
&cli.IntFlag{
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 0,
Value: 128,
},
&cli.DurationFlag{
Name: "post-read-timeout",
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ OPTIONS:
--windowpost enable window post (default: false)
--winningpost enable winning post (default: false)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 0)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128)
--post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--help, -h show help (default: false)
Expand Down
24 changes: 24 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@
#PurgeCacheOnStart = false


[Proving]
# Maximum number of sector checks to run in parallel. (0 = unlimited)
#
# type: int
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 128


[Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
# If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
Expand Down Expand Up @@ -484,33 +492,49 @@


[Storage]
# type: int
# env var: LOTUS_STORAGE_PARALLELFETCHLIMIT
#ParallelFetchLimit = 10

# Local worker config
#
# type: bool
# env var: LOTUS_STORAGE_ALLOWADDPIECE
#AllowAddPiece = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT1
#AllowPreCommit1 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT2
#AllowPreCommit2 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWCOMMIT
#AllowCommit = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWUNSEAL
#AllowUnseal = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE
#AllowReplicaUpdate = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
#AllowProveReplicaUpdate2 = true

# type: bool
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true

# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
#
# type: sectorstorage.ResourceFilteringStrategy
# env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware"

Expand Down
73 changes: 51 additions & 22 deletions extern/sector-storage/faults.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"sync"
"time"

"golang.org/x/xerrors"
Expand All @@ -24,22 +25,55 @@ type FaultTracker interface {

// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if rg == nil {
return nil, xerrors.Errorf("rg is nil")
}

var bad = make(map[abi.SectorID]string)
var badLk sync.Mutex

var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(postRand)
postRand[31] &= 0x3f

limit := m.parallelCheckLimit
if limit <= 0 {
limit = len(sectors)
}
throttle := make(chan struct{}, limit)

addBad := func(s abi.SectorID, reason string) {
badLk.Lock()
bad[s] = reason
badLk.Unlock()
}

var wg sync.WaitGroup
wg.Add(len(sectors))

for _, sector := range sectors {
err := func() error {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}

go func(sector storage.SectorRef) {
defer wg.Done()
defer func() {
<-throttle
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

commr, update, err := rg(ctx, sector.ID)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("getting commR: %s", err))
return
}

toLock := storiface.FTSealed | storiface.FTCache
Expand All @@ -49,31 +83,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,

locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err))
return
}

if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
bad[sector.ID] = fmt.Sprint("can't acquire read lock")
return nil
addBad(sector.ID, fmt.Sprint("can't acquire read lock"))
return
}

wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
return err
addBad(sector.ID, fmt.Sprint("can't get proof type"))
return
}

var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(pr)
pr[31] &= 0x3f

ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, postRand, []abi.SectorNumber{
sector.ID.Number,
})
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err))
return
}

vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout)
Expand All @@ -88,17 +120,14 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
}, wpp)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err))
return
}

return nil
}()
if err != nil {
return nil, err
}
}(sector)
}

wg.Wait()

return bad, nil
}

Expand Down
11 changes: 9 additions & 2 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore

parallelCheckLimit int

callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
callRes map[storiface.CallID]chan result
Expand Down Expand Up @@ -99,7 +101,7 @@ const (
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)

type SealerConfig struct {
type Config struct {
ParallelFetchLimit int

// Local worker config
Expand All @@ -116,14 +118,17 @@ type SealerConfig struct {
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering ResourceFilteringStrategy

// PoSt config
ParallelCheckLimit int
}

type StorageAuth http.Header

type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore

func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
Expand All @@ -142,6 +147,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.

localProver: prover,

parallelCheckLimit: sc.ParallelCheckLimit,

work: mss,
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},
Expand Down
6 changes: 3 additions & 3 deletions extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)

// miner's worker can only add pieces to an unsealed sector.
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: false,
Expand Down Expand Up @@ -198,7 +198,7 @@ func generatePieceData(size uint64) []byte {
return bz
}

func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background()
// listen on tcp socket to create an http server later
address := "0.0.0.0:0"
Expand Down
4 changes: 2 additions & 2 deletions itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (n *Ensemble) Start() *Ensemble {

// disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure.
node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig {
node.Override(new(sectorstorage.Config), func() sectorstorage.Config {
scfg := config.DefaultStorageMiner()

if noLocal {
Expand All @@ -596,7 +596,7 @@ func (n *Ensemble) Start() *Ensemble {
}

scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.Storage
return scfg.StorageManager()
}),

// upgrades
Expand Down
2 changes: 1 addition & 1 deletion node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
),

Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(sectorstorage.Config), cfg.StorageManager()),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
)
}
Expand Down
6 changes: 5 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ func DefaultStorageMiner() *StorageMiner {
TerminateBatchWait: Duration(5 * time.Minute),
},

Storage: sectorstorage.SealerConfig{
Proving: ProvingConfig{
ParallelCheckLimit: 128,
},

Storage: SealerConfig{
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
Expand Down
Loading

0 comments on commit 3ce467a

Please sign in to comment.