Skip to content

Commit

Permalink
Merge pull request #8333 from filecoin-project/backport/v1.15.1/feat/…
Browse files Browse the repository at this point in the history
…snap-queue

backport: v1.15.1: feat: sealing: Sector upgrade queue
  • Loading branch information
magik6k authored Mar 16, 2022
2 parents 5ea1502 + 4a1b211 commit 114cb4e
Show file tree
Hide file tree
Showing 21 changed files with 289 additions and 115 deletions.
1 change: 1 addition & 0 deletions cmd/lotus-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating},

{col: color.FgBlue, state: sealing.Empty},
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-miner/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ var sectorsListCmd = &cli.Command{

if cctx.Bool("unproven") {
for state := range sealing.ExistSectorStateList {
if state == sealing.Proving {
if state == sealing.Proving || state == sealing.Available {
continue
}
states = append(states, api.SectorState(state))
Expand Down
6 changes: 6 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@
# env var: LOTUS_SEALING_FINALIZEEARLY
#FinalizeEarly = false

# After sealing CC sectors, make them available for upgrading with deals
#
# type: bool
# env var: LOTUS_SEALING_MAKECCSECTORSAVAILABLE
#MakeCCSectorsAvailable = false

# Whether to use available miner balance for sector collateral instead of sending it with each message
#
# type: bool
Expand Down
8 changes: 8 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand All @@ -136,6 +137,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto

FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
on(SectorFinalizeFailed{}, FinalizeFailed),
),

Expand Down Expand Up @@ -283,7 +285,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
on(SectorAbortUpgrade{}, Proving),
),
Terminating: planOne(
on(SectorTerminating{}, TerminateWait),
Expand Down Expand Up @@ -558,6 +564,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:
Expand Down
8 changes: 8 additions & 0 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ type SectorFinalized struct{}

func (evt SectorFinalized) apply(*SectorInfo) {}

type SectorFinalizedAvailable struct{}

func (evt SectorFinalizedAvailable) apply(*SectorInfo) {}

type SectorRetryFinalize struct{}

func (evt SectorRetryFinalize) apply(*SectorInfo) {}
Expand All @@ -297,6 +301,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}

// Snap deals // CC update path

type SectorMarkForUpdate struct{}

func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}

type SectorStartCCUpdate struct{}

func (evt SectorStartCCUpdate) apply(state *SectorInfo) {
Expand Down
173 changes: 150 additions & 23 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
Expand All @@ -30,8 +33,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e

m.inputLk.Lock()

if m.creating != nil && *m.creating == sector.SectorNumber {
m.creating = nil
if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
m.nextDealSector = nil
}

sid := m.minerSectorID(sector.SectorNumber)
Expand Down Expand Up @@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp)
}

type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)

// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}

ssize, err := sp.SectorSize()
if err != nil {
return err
Expand All @@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

toAssign[proposalCid] = struct{}{}

memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}

for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
Expand All @@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue
}
if !ok {
exp, _ := expF(sector.number)
exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue
}
Expand Down Expand Up @@ -497,18 +508,121 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil {
if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}

return nil
}

func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) {
var candidates []*pendingPiece

for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}

// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
})

var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)

if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil
}
}

_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
}

minDur, maxDur := policy.DealDurationBounds(0)

return curEpoch + minDur, curEpoch + maxDur, nil
}

func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}

ssize, err := sp.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}
minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize)
if err != nil {
return false, xerrors.Errorf("calculating min target expiration: %w", err)
}

var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt

for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}

slowChecks := func(sid abi.SectorNumber) bool {
active, err := m.sectorActive(ctx, TipSetToken{}, sid)
if err != nil {
log.Errorw("checking sector active", "error", err)
return false
}
if !active {
log.Debugw("skipping available sector", "reason", "not active")
return false
}
return true
}

// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target

if bestExpiration < targetExpiration {
if expiration > bestExpiration && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}

if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}

if bestExpiration < minExpiration {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}

log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}

func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()

if m.creating != nil {
if m.nextDealSector != nil {
return nil // new sector is being created right now
}

Expand All @@ -517,15 +631,23 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err)
}

if !cfg.MakeNewSectorForDeals {
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}

if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}

if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}

if !cfg.MakeNewSectorForDeals {
return nil
}

Expand All @@ -534,7 +656,7 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return err
}

m.creating = &sid
m.nextDealSector = &sid

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
Expand Down Expand Up @@ -573,6 +695,11 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait()

m.inputLk.Lock()
// always do this early
delete(m.available, m.minerSectorID(sid))
m.inputLk.Unlock()

log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
}
Expand Down
5 changes: 3 additions & 2 deletions extern/storage-sealing/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions extern/storage-sealing/sealiface/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config struct {

MakeNewSectorForDeals bool

MakeCCSectorsAvailable bool

WaitDealsDelay time.Duration

CommittedCapacitySectorLifetime time.Duration
Expand Down
Loading

0 comments on commit 114cb4e

Please sign in to comment.