Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sealing: Sector upgrade queue #8330

Merged
merged 15 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lotus-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating},

{col: color.FgBlue, state: sealing.Empty},
Expand Down
5 changes: 5 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
),
Terminating: planOne(
Expand Down Expand Up @@ -558,6 +561,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:
Expand Down
4 changes: 4 additions & 0 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}

// Snap deals // CC update path

type SectorMarkForUpdate struct{}

func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}

type SectorStartCCUpdate struct{}

func (evt SectorStartCCUpdate) apply(state *SectorInfo) {
Expand Down
141 changes: 122 additions & 19 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
Expand Down Expand Up @@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp)
}

type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)

// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}

ssize, err := sp.SectorSize()
if err != nil {
return err
Expand All @@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

toAssign[proposalCid] = struct{}{}

memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}

for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
Expand All @@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue
}
if !ok {
exp, _ := expF(sector.number)
exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue
}
Expand Down Expand Up @@ -497,15 +508,99 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e

if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil {
if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}

return nil
}

func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (target abi.ChainEpoch) {
var candidates []*pendingPiece

for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}

// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
})

var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)

if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidate.deal.DealProposal.EndEpoch
}
}

_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
log.Errorf("getting current epoch: %s", err)
return 0
}

_, maxDur := policy.DealDurationBounds(0)

return curEpoch + maxDur
}

func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}

ssize, _ := sp.SectorSize() // error already checked in the caller
magik6k marked this conversation as resolved.
Show resolved Hide resolved
targetExpiration := m.calcTargetExpiration(ctx, ssize)

var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt

for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}

// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target

if bestExpiration < targetExpiration {
if expiration > bestExpiration {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}

if expiration > targetExpiration && pledge.LessThan(bestPledge) {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}

if bestExpiration == 0 {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
// didn't find a good sector
return false, nil
}

log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.creating = &candidate.Number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set creating I thought this was only needed when making a new sector, not dealing with an existing one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This var is used to synchronize sector event processing - basically we want to make sure that we don't re-enter tryGetDealSector before the sector we've just asked to enter WaitDeals* adds itself to m.openSectors.

Renamed the var to something which makes more sense given that we now also use it for snapdeals

return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}

func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()

if m.creating != nil {
Expand All @@ -517,15 +612,23 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err)
}

if !cfg.MakeNewSectorForDeals {
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}

if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}

if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}

if !cfg.MakeNewSectorForDeals {
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions extern/storage-sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Sealing struct {
assignedPieces map[abi.SectorID][]cid.Cid
creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once

available map[abi.SectorID]struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will become stale if the miner restarts. Are there existing patterns for traversing sector infos to repopulate this for all infos in Available state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Available state handler gets called on restart, so this will get repopulated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah cool, is this the same for all state handlers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep


notifee SectorStateNotifee
addrSel AddrSel

Expand All @@ -126,11 +128,11 @@ type openSector struct {
maybeAccept func(cid.Cid) error // called with inputLk
}

func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi.SectorNumber) (abi.ChainEpoch, error)) (bool, error) {
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) {
if !o.ccUpdate {
return true, nil
}
expiration, err := expF(o.number)
expiration, _, err := expF(o.number)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -175,6 +177,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
pendingPieces: map[cid.Cid]*pendingPiece{},
assignedPieces: map[abi.SectorID][]cid.Cid{},

available: map[abi.SectorID]struct{}{},

notifee: notifee,
addrSel: as,

Expand Down
4 changes: 3 additions & 1 deletion extern/storage-sealing/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
CommitAggregateWait: {},
FinalizeSector: {},
Proving: {},
Available: {},
FailedUnrecoverable: {},
SealPreCommit1Failed: {},
SealPreCommit2Failed: {},
Expand Down Expand Up @@ -98,6 +99,7 @@ const (

FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
Available SectorState = "Available" // proving CC available for SnapDeals

// snap deals / cc update
SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals"
Expand Down Expand Up @@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving
}
return sstSealing
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
case Proving, Available, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving
}

Expand Down
15 changes: 15 additions & 0 deletions extern/storage-sealing/states_proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) er
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration

m.inputLk.Lock()
// in case we revert into Proving without going into Available
delete(m.available, m.minerSectorID(sector.SectorNumber))
magik6k marked this conversation as resolved.
Show resolved Hide resolved
m.inputLk.Unlock()

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
Expand All @@ -144,3 +149,13 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf

return nil
}

func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{}
m.inputLk.Unlock()
// TODO: Watch termination
// TODO: Auto-extend if set

return nil
}
13 changes: 1 addition & 12 deletions extern/storage-sealing/upgrade_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@ import (
)

func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}

curStaging := m.stats.curStaging()
if cfg.MaxWaitDealsSectors > 0 && curStaging >= cfg.MaxWaitDealsSectors {
return xerrors.Errorf("already waiting for deals in %d >= %d (cfg.MaxWaitDealsSectors) sectors, no free resources to wait for deals in another",
curStaging, cfg.MaxWaitDealsSectors)
}

si, err := m.GetSectorInfo(id)
if err != nil {
return xerrors.Errorf("getting sector info: %w", err)
Expand Down Expand Up @@ -62,7 +51,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
"Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration)
}

return m.sectors.Send(uint64(id), SectorStartCCUpdate{})
return m.sectors.Send(uint64(id), SectorMarkForUpdate{})
}

func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, tok TipSetToken, sector abi.SectorNumber) (bool, error) {
Expand Down
Loading