Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: v1.15.1: feat: sealing: Sector upgrade queue #8333

Merged
merged 15 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lotus-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating},

{col: color.FgBlue, state: sealing.Empty},
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-miner/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ var sectorsListCmd = &cli.Command{

if cctx.Bool("unproven") {
for state := range sealing.ExistSectorStateList {
if state == sealing.Proving {
if state == sealing.Proving || state == sealing.Available {
continue
}
states = append(states, api.SectorState(state))
Expand Down
6 changes: 6 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@
# env var: LOTUS_SEALING_FINALIZEEARLY
#FinalizeEarly = false

# After sealing CC sectors, make them available for upgrading with deals
#
# type: bool
# env var: LOTUS_SEALING_MAKECCSECTORSAVAILABLE
#MakeCCSectorsAvailable = false

# Whether to use available miner balance for sector collateral instead of sending it with each message
#
# type: bool
Expand Down
8 changes: 8 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand All @@ -136,6 +137,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto

FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
on(SectorFinalizeFailed{}, FinalizeFailed),
),

Expand Down Expand Up @@ -283,7 +285,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
on(SectorAbortUpgrade{}, Proving),
),
Terminating: planOne(
on(SectorTerminating{}, TerminateWait),
Expand Down Expand Up @@ -558,6 +564,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:
Expand Down
8 changes: 8 additions & 0 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ type SectorFinalized struct{}

func (evt SectorFinalized) apply(*SectorInfo) {}

type SectorFinalizedAvailable struct{}

func (evt SectorFinalizedAvailable) apply(*SectorInfo) {}

type SectorRetryFinalize struct{}

func (evt SectorRetryFinalize) apply(*SectorInfo) {}
Expand All @@ -297,6 +301,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}

// Snap deals // CC update path

type SectorMarkForUpdate struct{}

func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}

type SectorStartCCUpdate struct{}

func (evt SectorStartCCUpdate) apply(state *SectorInfo) {
Expand Down
173 changes: 150 additions & 23 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
Expand All @@ -30,8 +33,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e

m.inputLk.Lock()

if m.creating != nil && *m.creating == sector.SectorNumber {
m.creating = nil
if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
m.nextDealSector = nil
}

sid := m.minerSectorID(sector.SectorNumber)
Expand Down Expand Up @@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp)
}

type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)

// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}

ssize, err := sp.SectorSize()
if err != nil {
return err
Expand All @@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

toAssign[proposalCid] = struct{}{}

memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}

for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
Expand All @@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue
}
if !ok {
exp, _ := expF(sector.number)
exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue
}
Expand Down Expand Up @@ -497,18 +508,121 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil {
if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}

return nil
}

func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) {
var candidates []*pendingPiece

for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}

// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
})

var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)

if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil
}
}

_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
}

minDur, maxDur := policy.DealDurationBounds(0)

return curEpoch + minDur, curEpoch + maxDur, nil
}

func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}

ssize, err := sp.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}
minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize)
if err != nil {
return false, xerrors.Errorf("calculating min target expiration: %w", err)
}

var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt

for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}

slowChecks := func(sid abi.SectorNumber) bool {
active, err := m.sectorActive(ctx, TipSetToken{}, sid)
if err != nil {
log.Errorw("checking sector active", "error", err)
return false
}
if !active {
log.Debugw("skipping available sector", "reason", "not active")
return false
}
return true
}

// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target

if bestExpiration < targetExpiration {
if expiration > bestExpiration && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}

if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}

if bestExpiration < minExpiration {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}

log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}

func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()

if m.creating != nil {
if m.nextDealSector != nil {
return nil // new sector is being created right now
}

Expand All @@ -517,15 +631,23 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err)
}

if !cfg.MakeNewSectorForDeals {
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}

if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}

if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}

if !cfg.MakeNewSectorForDeals {
return nil
}

Expand All @@ -534,7 +656,7 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return err
}

m.creating = &sid
m.nextDealSector = &sid

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
Expand Down Expand Up @@ -573,6 +695,11 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait()

m.inputLk.Lock()
// always do this early
delete(m.available, m.minerSectorID(sid))
m.inputLk.Unlock()

log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
}
Expand Down
5 changes: 3 additions & 2 deletions extern/storage-sealing/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions extern/storage-sealing/sealiface/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config struct {

MakeNewSectorForDeals bool

MakeCCSectorsAvailable bool

WaitDealsDelay time.Duration

CommittedCapacitySectorLifetime time.Duration
Expand Down
Loading