Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sealing: Early finalization option #6452

Merged
merged 4 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/lotus-storage-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommitBatchWait},
{col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.CommitFinalize},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.SubmitCommitAggregate},
Expand All @@ -315,6 +316,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.PreCommitFailed},
{col: color.FgRed, state: sealing.ComputeProofFailed},
{col: color.FgRed, state: sealing.CommitFailed},
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
{col: color.FgRed, state: sealing.PackingFailed},
{col: color.FgRed, state: sealing.FinalizeFailed},
{col: color.FgRed, state: sealing.Faulty},
Expand Down
14 changes: 14 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed),
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
Expand Down Expand Up @@ -150,6 +154,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
CommitFinalizeFailed: planOne(
on(SectorRetryFinalize{}, CommitFinalizeFailed),
),
CommitFailed: planOne(
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorRetryWaitSeed{}, WaitSeed),
Expand Down Expand Up @@ -372,6 +379,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
case CommitFinalize:
fallthrough
case FinalizeSector:
return m.handleFinalizeSector, processed, nil

Expand All @@ -386,6 +395,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleComputeProofFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
fallthrough
case FinalizeFailed:
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
Expand Down Expand Up @@ -474,6 +485,9 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
Expand Down
9 changes: 9 additions & 0 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof
}

// like SectorCommitted, but finalizes before sending the proof to the chain
type SectorProofReady struct {
Proof []byte
}

func (evt SectorProofReady) apply(state *SectorInfo) {
state.Proof = evt.Proof
}

type SectorSubmitCommitAggregate struct{}

func (evt SectorSubmitCommitAggregate) apply(*SectorInfo) {}
Expand Down
67 changes: 67 additions & 0 deletions extern/storage-sealing/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,73 @@ func TestHappyPath(t *testing.T) {
}
}

func TestHappyPathFinalizeEarly(t *testing.T) {
var notif []struct{ before, after SectorInfo }
ma, _ := address.NewIDAddress(55151)
m := test{
s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
notifee: func(before, after SectorInfo) {
notif = append(notif, struct{ before, after SectorInfo }{before, after})
},
},
t: t,
state: &SectorInfo{State: Packing},
}

m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, GetTicket)

m.planSingle(SectorTicket{})
require.Equal(m.t, m.state.State, PreCommit1)

m.planSingle(SectorPreCommit1{})
require.Equal(m.t, m.state.State, PreCommit2)

m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, PreCommitting)

m.planSingle(SectorPreCommitted{})
require.Equal(m.t, m.state.State, PreCommitWait)

m.planSingle(SectorPreCommitLanded{})
require.Equal(m.t, m.state.State, WaitSeed)

m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, Committing)

m.planSingle(SectorProofReady{})
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)

m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
}
if n.after.State != expected[i+1] {
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
}
}
}

func TestSeedRevert(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{
Expand Down
2 changes: 2 additions & 0 deletions extern/storage-sealing/sealiface/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Config struct {

AlwaysKeepUnsealedCopy bool

FinalizeEarly bool

BatchPreCommits bool
MaxPreCommitBatch int
MinPreCommitBatch int
Expand Down
10 changes: 7 additions & 3 deletions extern/storage-sealing/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
PreCommitBatchWait: {},
WaitSeed: {},
Committing: {},
CommitFinalize: {},
CommitFinalizeFailed: {},
SubmitCommit: {},
CommitWait: {},
SubmitCommitAggregate: {},
Expand Down Expand Up @@ -63,8 +65,10 @@ const (
SubmitPreCommitBatch SectorState = "SubmitPreCommitBatch"
PreCommitBatchWait SectorState = "PreCommitBatchWait"

WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
CommitFinalize SectorState = "CommitFinalize" // cleanup sector metadata before submitting the proof (early finalize)
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"

// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
Expand Down Expand Up @@ -106,7 +110,7 @@ func toStatState(st SectorState) statSectorState {
switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving
Expand Down
25 changes: 24 additions & 1 deletion extern/storage-sealing/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
}

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}

log.Info("scheduling seal proof computation...")

log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
Expand All @@ -500,6 +505,24 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}

{
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("commit check error: %w", err)})
}
}

if cfg.FinalizeEarly {
return ctx.Send(SectorProofReady{
Proof: proof,
})
}

return ctx.Send(SectorCommitted{
Proof: proof,
})
Expand All @@ -524,7 +547,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo

tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type SealingConfig struct {

AlwaysKeepUnsealedCopy bool

// Run sector finalization before submitting sector proof to the chain
FinalizeEarly bool

// enable / disable precommit batching (takes effect after nv13)
BatchPreCommits bool
// maximum precommit batch size - batches will be sent immediately above this size
Expand Down Expand Up @@ -279,6 +282,7 @@ func DefaultStorageMiner() *StorageMiner {
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: Duration(time.Hour * 6),
AlwaysKeepUnsealedCopy: true,
FinalizeEarly: false,

BatchPreCommits: true,
MinPreCommitBatch: 1, // we must have at least one precommit to batch
Expand Down
2 changes: 2 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,

BatchPreCommits: cfg.BatchPreCommits,
MinPreCommitBatch: cfg.MinPreCommitBatch,
Expand Down Expand Up @@ -857,6 +858,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,

BatchPreCommits: cfg.Sealing.BatchPreCommits,
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,
Expand Down