From 62769e3b11a7ba571ba055eb580a854c99bc4aaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 18 Aug 2021 12:43:44 +0200 Subject: [PATCH] sealing: Fix RecoverDealIDs loop with changed PieceCID --- extern/storage-sealing/checks.go | 6 +- extern/storage-sealing/fsm.go | 9 +- extern/storage-sealing/mocks/api.go | 439 +++++++++++++++++++ extern/storage-sealing/mocks/statemachine.go | 63 +++ extern/storage-sealing/sealing.go | 16 +- extern/storage-sealing/states_failed.go | 37 +- extern/storage-sealing/states_failed_test.go | 99 +++++ extern/storage-sealing/states_proving.go | 12 +- extern/storage-sealing/states_sealing.go | 68 +-- extern/storage-sealing/types.go | 8 + extern/storage-sealing/upgrade_queue.go | 4 +- 11 files changed, 695 insertions(+), 66 deletions(-) create mode 100644 extern/storage-sealing/mocks/api.go create mode 100644 extern/storage-sealing/mocks/statemachine.go create mode 100644 extern/storage-sealing/states_failed_test.go diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 115eedea549..b01f746bafd 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -124,7 +124,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")} } - pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok) if err == ErrSectorAllocated { // not much more we can check here, basically try to wait for commit, // and hope that this will work @@ -152,7 +152,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return err } - seed, err := m.api.ChainGetRandomnessFromBeacon(ctx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, si.SeedEpoch, buf.Bytes()) + seed, err := m.Api.ChainGetRandomnessFromBeacon(ctx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, si.SeedEpoch, buf.Bytes()) if err != nil { return &ErrApi{xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)} } @@ -181,7 +181,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")} } - if err := checkPieces(ctx, m.maddr, si, m.api); err != nil { + if err := checkPieces(ctx, m.maddr, si, m.Api); err != nil { return err } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index d04aef7904f..0a4dedbf2ec 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -351,6 +351,13 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta log.Errorw("update sector stats", "error", err) } + // todo: drop this, use Context iface everywhere + wrapCtx := func(f func(Context, SectorInfo) error) func(statemachine.Context, SectorInfo) error { + return func(ctx statemachine.Context, info SectorInfo) error { + return f(&ctx, info) + } + } + switch state.State { // Happy path case Empty: @@ -413,7 +420,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case DealsExpired: return m.handleDealsExpired, processed, nil case RecoverDealIDs: - return m.handleRecoverDealIDs, processed, nil + return wrapCtx(m.HandleRecoverDealIDs), processed, nil // Post-seal case Proving: diff --git a/extern/storage-sealing/mocks/api.go b/extern/storage-sealing/mocks/api.go new file mode 100644 index 00000000000..d2962c56d36 --- /dev/null +++ b/extern/storage-sealing/mocks/api.go @@ -0,0 +1,439 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: SealingAPI) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + address "github.com/filecoin-project/go-address" + abi "github.com/filecoin-project/go-state-types/abi" + big "github.com/filecoin-project/go-state-types/big" + crypto "github.com/filecoin-project/go-state-types/crypto" + dline "github.com/filecoin-project/go-state-types/dline" + network "github.com/filecoin-project/go-state-types/network" + api "github.com/filecoin-project/lotus/api" + market "github.com/filecoin-project/lotus/chain/actors/builtin/market" + miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + types "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" +) + +// MockSealingAPI is a mock of SealingAPI interface. +type MockSealingAPI struct { + ctrl *gomock.Controller + recorder *MockSealingAPIMockRecorder +} + +// MockSealingAPIMockRecorder is the mock recorder for MockSealingAPI. +type MockSealingAPIMockRecorder struct { + mock *MockSealingAPI +} + +// NewMockSealingAPI creates a new mock instance. +func NewMockSealingAPI(ctrl *gomock.Controller) *MockSealingAPI { + mock := &MockSealingAPI{ctrl: ctrl} + mock.recorder = &MockSealingAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSealingAPI) EXPECT() *MockSealingAPIMockRecorder { + return m.recorder +} + +// ChainBaseFee mocks base method. +func (m *MockSealingAPI) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainBaseFee indicates an expected call of ChainBaseFee. +func (mr *MockSealingAPIMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockSealingAPI)(nil).ChainBaseFee), arg0, arg1) +} + +// ChainGetMessage mocks base method. +func (m *MockSealingAPI) ChainGetMessage(arg0 context.Context, arg1 cid.Cid) (*types.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainGetMessage", arg0, arg1) + ret0, _ := ret[0].(*types.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainGetMessage indicates an expected call of ChainGetMessage. +func (mr *MockSealingAPIMockRecorder) ChainGetMessage(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetMessage", reflect.TypeOf((*MockSealingAPI)(nil).ChainGetMessage), arg0, arg1) +} + +// ChainGetRandomnessFromBeacon mocks base method. +func (m *MockSealingAPI) ChainGetRandomnessFromBeacon(arg0 context.Context, arg1 sealing.TipSetToken, arg2 crypto.DomainSeparationTag, arg3 abi.ChainEpoch, arg4 []byte) (abi.Randomness, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainGetRandomnessFromBeacon", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(abi.Randomness) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainGetRandomnessFromBeacon indicates an expected call of ChainGetRandomnessFromBeacon. +func (mr *MockSealingAPIMockRecorder) ChainGetRandomnessFromBeacon(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetRandomnessFromBeacon", reflect.TypeOf((*MockSealingAPI)(nil).ChainGetRandomnessFromBeacon), arg0, arg1, arg2, arg3, arg4) +} + +// ChainGetRandomnessFromTickets mocks base method. +func (m *MockSealingAPI) ChainGetRandomnessFromTickets(arg0 context.Context, arg1 sealing.TipSetToken, arg2 crypto.DomainSeparationTag, arg3 abi.ChainEpoch, arg4 []byte) (abi.Randomness, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainGetRandomnessFromTickets", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(abi.Randomness) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainGetRandomnessFromTickets indicates an expected call of ChainGetRandomnessFromTickets. +func (mr *MockSealingAPIMockRecorder) ChainGetRandomnessFromTickets(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetRandomnessFromTickets", reflect.TypeOf((*MockSealingAPI)(nil).ChainGetRandomnessFromTickets), arg0, arg1, arg2, arg3, arg4) +} + +// ChainHead mocks base method. +func (m *MockSealingAPI) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainHead", arg0) + ret0, _ := ret[0].(sealing.TipSetToken) + ret1, _ := ret[1].(abi.ChainEpoch) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ChainHead indicates an expected call of ChainHead. +func (mr *MockSealingAPIMockRecorder) ChainHead(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockSealingAPI)(nil).ChainHead), arg0) +} + +// ChainReadObj mocks base method. +func (m *MockSealingAPI) ChainReadObj(arg0 context.Context, arg1 cid.Cid) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainReadObj", arg0, arg1) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainReadObj indicates an expected call of ChainReadObj. +func (mr *MockSealingAPIMockRecorder) ChainReadObj(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainReadObj", reflect.TypeOf((*MockSealingAPI)(nil).ChainReadObj), arg0, arg1) +} + +// SendMsg mocks base method. +func (m *MockSealingAPI) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockSealingAPIMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockSealingAPI)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + +// StateComputeDataCommitment mocks base method. +func (m *MockSealingAPI) StateComputeDataCommitment(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 sealing.TipSetToken) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateComputeDataCommitment", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateComputeDataCommitment indicates an expected call of StateComputeDataCommitment. +func (mr *MockSealingAPIMockRecorder) StateComputeDataCommitment(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCommitment", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCommitment), arg0, arg1, arg2, arg3, arg4) +} + +// StateLookupID mocks base method. +func (m *MockSealingAPI) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateLookupID", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateLookupID indicates an expected call of StateLookupID. +func (mr *MockSealingAPIMockRecorder) StateLookupID(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateLookupID", reflect.TypeOf((*MockSealingAPI)(nil).StateLookupID), arg0, arg1, arg2) +} + +// StateMarketStorageDeal mocks base method. +func (m *MockSealingAPI) StateMarketStorageDeal(arg0 context.Context, arg1 abi.DealID, arg2 sealing.TipSetToken) (*api.MarketDeal, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMarketStorageDeal", arg0, arg1, arg2) + ret0, _ := ret[0].(*api.MarketDeal) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMarketStorageDeal indicates an expected call of StateMarketStorageDeal. +func (mr *MockSealingAPIMockRecorder) StateMarketStorageDeal(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMarketStorageDeal", reflect.TypeOf((*MockSealingAPI)(nil).StateMarketStorageDeal), arg0, arg1, arg2) +} + +// StateMarketStorageDealProposal mocks base method. +func (m *MockSealingAPI) StateMarketStorageDealProposal(arg0 context.Context, arg1 abi.DealID, arg2 sealing.TipSetToken) (market.DealProposal, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMarketStorageDealProposal", arg0, arg1, arg2) + ret0, _ := ret[0].(market.DealProposal) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMarketStorageDealProposal indicates an expected call of StateMarketStorageDealProposal. +func (mr *MockSealingAPIMockRecorder) StateMarketStorageDealProposal(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMarketStorageDealProposal", reflect.TypeOf((*MockSealingAPI)(nil).StateMarketStorageDealProposal), arg0, arg1, arg2) +} + +// StateMinerAvailableBalance mocks base method. +func (m *MockSealingAPI) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerAvailableBalance", arg0, arg1, arg2) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerAvailableBalance indicates an expected call of StateMinerAvailableBalance. +func (mr *MockSealingAPIMockRecorder) StateMinerAvailableBalance(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerAvailableBalance", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerAvailableBalance), arg0, arg1, arg2) +} + +// StateMinerInfo mocks base method. +func (m *MockSealingAPI) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2) + ret0, _ := ret[0].(miner.MinerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerInfo indicates an expected call of StateMinerInfo. +func (mr *MockSealingAPIMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerInfo), arg0, arg1, arg2) +} + +// StateMinerInitialPledgeCollateral mocks base method. +func (m *MockSealingAPI) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral. +func (mr *MockSealingAPIMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3) +} + +// StateMinerPartitions mocks base method. +func (m *MockSealingAPI) StateMinerPartitions(arg0 context.Context, arg1 address.Address, arg2 uint64, arg3 sealing.TipSetToken) ([]api.Partition, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerPartitions", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]api.Partition) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerPartitions indicates an expected call of StateMinerPartitions. +func (mr *MockSealingAPIMockRecorder) StateMinerPartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerPartitions", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerPartitions), arg0, arg1, arg2, arg3) +} + +// StateMinerPreCommitDepositForPower mocks base method. +func (m *MockSealingAPI) StateMinerPreCommitDepositForPower(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerPreCommitDepositForPower", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerPreCommitDepositForPower indicates an expected call of StateMinerPreCommitDepositForPower. +func (mr *MockSealingAPIMockRecorder) StateMinerPreCommitDepositForPower(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerPreCommitDepositForPower", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerPreCommitDepositForPower), arg0, arg1, arg2, arg3) +} + +// StateMinerProvingDeadline mocks base method. +func (m *MockSealingAPI) StateMinerProvingDeadline(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (*dline.Info, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerProvingDeadline", arg0, arg1, arg2) + ret0, _ := ret[0].(*dline.Info) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerProvingDeadline indicates an expected call of StateMinerProvingDeadline. +func (mr *MockSealingAPIMockRecorder) StateMinerProvingDeadline(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerProvingDeadline", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerProvingDeadline), arg0, arg1, arg2) +} + +// StateMinerSectorAllocated mocks base method. +func (m *MockSealingAPI) StateMinerSectorAllocated(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerSectorAllocated", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerSectorAllocated indicates an expected call of StateMinerSectorAllocated. +func (mr *MockSealingAPIMockRecorder) StateMinerSectorAllocated(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerSectorAllocated", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerSectorAllocated), arg0, arg1, arg2, arg3) +} + +// StateMinerSectorSize mocks base method. +func (m *MockSealingAPI) StateMinerSectorSize(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (abi.SectorSize, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerSectorSize", arg0, arg1, arg2) + ret0, _ := ret[0].(abi.SectorSize) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerSectorSize indicates an expected call of StateMinerSectorSize. +func (mr *MockSealingAPIMockRecorder) StateMinerSectorSize(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerSectorSize", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerSectorSize), arg0, arg1, arg2) +} + +// StateMinerWorkerAddress mocks base method. +func (m *MockSealingAPI) StateMinerWorkerAddress(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerWorkerAddress", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerWorkerAddress indicates an expected call of StateMinerWorkerAddress. +func (mr *MockSealingAPIMockRecorder) StateMinerWorkerAddress(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerWorkerAddress", reflect.TypeOf((*MockSealingAPI)(nil).StateMinerWorkerAddress), arg0, arg1, arg2) +} + +// StateNetworkVersion mocks base method. +func (m *MockSealingAPI) StateNetworkVersion(arg0 context.Context, arg1 sealing.TipSetToken) (network.Version, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateNetworkVersion", arg0, arg1) + ret0, _ := ret[0].(network.Version) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateNetworkVersion indicates an expected call of StateNetworkVersion. +func (mr *MockSealingAPIMockRecorder) StateNetworkVersion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockSealingAPI)(nil).StateNetworkVersion), arg0, arg1) +} + +// StateSearchMsg mocks base method. +func (m *MockSealingAPI) StateSearchMsg(arg0 context.Context, arg1 cid.Cid) (*sealing.MsgLookup, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateSearchMsg", arg0, arg1) + ret0, _ := ret[0].(*sealing.MsgLookup) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateSearchMsg indicates an expected call of StateSearchMsg. +func (mr *MockSealingAPIMockRecorder) StateSearchMsg(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSearchMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateSearchMsg), arg0, arg1) +} + +// StateSectorGetInfo mocks base method. +func (m *MockSealingAPI) StateSectorGetInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorOnChainInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateSectorGetInfo", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*miner.SectorOnChainInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateSectorGetInfo indicates an expected call of StateSectorGetInfo. +func (mr *MockSealingAPIMockRecorder) StateSectorGetInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorGetInfo", reflect.TypeOf((*MockSealingAPI)(nil).StateSectorGetInfo), arg0, arg1, arg2, arg3) +} + +// StateSectorPartition mocks base method. +func (m *MockSealingAPI) StateSectorPartition(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*sealing.SectorLocation, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateSectorPartition", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*sealing.SectorLocation) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateSectorPartition indicates an expected call of StateSectorPartition. +func (mr *MockSealingAPIMockRecorder) StateSectorPartition(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPartition", reflect.TypeOf((*MockSealingAPI)(nil).StateSectorPartition), arg0, arg1, arg2, arg3) +} + +// StateSectorPreCommitInfo mocks base method. +func (m *MockSealingAPI) StateSectorPreCommitInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateSectorPreCommitInfo", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*miner.SectorPreCommitOnChainInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateSectorPreCommitInfo indicates an expected call of StateSectorPreCommitInfo. +func (mr *MockSealingAPIMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockSealingAPI)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3) +} + +// StateWaitMsg mocks base method. +func (m *MockSealingAPI) StateWaitMsg(arg0 context.Context, arg1 cid.Cid) (sealing.MsgLookup, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateWaitMsg", arg0, arg1) + ret0, _ := ret[0].(sealing.MsgLookup) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateWaitMsg indicates an expected call of StateWaitMsg. +func (mr *MockSealingAPIMockRecorder) StateWaitMsg(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1) +} diff --git a/extern/storage-sealing/mocks/statemachine.go b/extern/storage-sealing/mocks/statemachine.go new file mode 100644 index 00000000000..9fdabdc879b --- /dev/null +++ b/extern/storage-sealing/mocks/statemachine.go @@ -0,0 +1,63 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: Context) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockContext is a mock of Context interface. +type MockContext struct { + ctrl *gomock.Controller + recorder *MockContextMockRecorder +} + +// MockContextMockRecorder is the mock recorder for MockContext. +type MockContextMockRecorder struct { + mock *MockContext +} + +// NewMockContext creates a new mock instance. +func NewMockContext(ctrl *gomock.Controller) *MockContext { + mock := &MockContext{ctrl: ctrl} + mock.recorder = &MockContextMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockContext) EXPECT() *MockContextMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockContext) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockContextMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockContext)(nil).Context)) +} + +// Send mocks base method. +func (m *MockContext) Send(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockContextMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockContext)(nil).Send), arg0) +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 3e40d10f396..3defdd83078 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -44,6 +44,8 @@ type SectorLocation struct { var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain") +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . SealingAPI + type SealingAPI interface { StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) @@ -80,7 +82,9 @@ type SectorStateNotifee func(before, after SectorInfo) type AddrSel func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) type Sealing struct { - api SealingAPI + Api SealingAPI + DealInfo *CurrentDealInfoManager + feeCfg config.MinerFeeConfig events Events @@ -114,7 +118,6 @@ type Sealing struct { commiter *CommitBatcher getConfig GetSealingConfigFunc - dealInfo *CurrentDealInfoManager } type openSector struct { @@ -135,7 +138,9 @@ type pendingPiece struct { func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing { s := &Sealing{ - api: api, + Api: api, + DealInfo: &CurrentDealInfoManager{api}, + feeCfg: fc, events: events, @@ -159,7 +164,6 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events commiter: NewCommitBatcher(mctx, maddr, api, as, fc, gc, prov), getConfig: gc, - dealInfo: &CurrentDealInfoManager{api}, stats: SectorStats{ bySector: map[abi.SectorID]statSectorState{}, @@ -229,12 +233,12 @@ func (m *Sealing) CommitPending(ctx context.Context) ([]abi.SectorID, error) { } func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) { - mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil) + mi, err := m.Api.StateMinerInfo(ctx, m.maddr, nil) if err != nil { return 0, err } - ver, err := m.api.StateNetworkVersion(ctx, nil) + ver, err := m.Api.StateNetworkVersion(ctx, nil) if err != nil { return 0, err } diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index bd5f489b40e..f1fd092b644 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -35,13 +35,13 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { } func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitOnChainInfo, bool) { - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err) return nil, false } - info, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err) return nil, false @@ -71,14 +71,14 @@ func (m *Sealing) handleSealPrecommit2Failed(ctx statemachine.Context, sector Se } func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, height, err := m.api.ChainHead(ctx.Context()) + tok, height, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) return nil } if sector.PreCommitMessage != nil { - mw, err := m.api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage) + mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { @@ -105,7 +105,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI } } - if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) @@ -182,14 +182,14 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect } func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } if sector.CommitMessage != nil { - mw, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { @@ -286,7 +286,7 @@ func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorI // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to // the Terminating state after cooldown. If the API is still failing, well get back to here // with the error in SectorInfo log. - pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + pci, _ := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if pci != nil { return nil // pause the fsm, needs manual user action } @@ -300,7 +300,7 @@ func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorI func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { // First make vary sure the sector isn't committed - si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if err != nil { return xerrors.Errorf("getting sector info: %w", err) } @@ -319,8 +319,8 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } -func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error { - tok, height, err := m.api.ChainHead(ctx.Context()) +func (m *Sealing) HandleRecoverDealIDs(ctx Context, sector SectorInfo) error { + tok, height, err := m.Api.ChainHead(ctx.Context()) if err != nil { return xerrors.Errorf("getting chain head: %w", err) } @@ -340,7 +340,7 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn continue } - proposal, err := m.api.StateMarketStorageDealProposal(ctx.Context(), p.DealInfo.DealID, tok) + proposal, err := m.Api.StateMarketStorageDealProposal(ctx.Context(), p.DealInfo.DealID, tok) if err != nil { log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) toFix = append(toFix, i) @@ -389,11 +389,16 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn mdp := market.DealProposal(*p.DealInfo.DealProposal) dp = &mdp } - res, err := m.dealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid) + res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid) if err != nil { failed[i] = xerrors.Errorf("getting current deal info for piece %d: %w", i, err) } + if res.MarketDeal.Proposal.PieceCID != p.Piece.PieceCID { + failed[i] = xerrors.Errorf("recovered piece (%d) deal in sector %d (dealid %d) has different PieceCID %s != %s", i, sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, res.MarketDeal.Proposal.PieceCID) + continue + } + updates[i] = res.DealID } @@ -409,7 +414,11 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn } // todo: try to remove bad pieces (hard; see the todo above) - return xerrors.Errorf("failed to recover some deals: %w", merr) + + // for now removing sectors is probably better than having them stuck in RecoverDealIDs + // and expire anyways + log.Errorf("removing sector %d: deals expired or unrecoverable: %+v", sector.SectorNumber, merr) + return ctx.Send(SectorRemove{}) } // Not much to do here, we can't go back in time to commit this sector diff --git a/extern/storage-sealing/states_failed_test.go b/extern/storage-sealing/states_failed_test.go new file mode 100644 index 00000000000..d73c597dcd9 --- /dev/null +++ b/extern/storage-sealing/states_failed_test.go @@ -0,0 +1,99 @@ +package sealing_test + +import ( + "bytes" + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/cbor" + "github.com/filecoin-project/go-state-types/exitcode" + + api2 "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/mocks" +) + +func TestStateRecoverDealIDs(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + ctx := context.Background() + + api := mocks.NewMockSealingAPI(mockCtrl) + + fakeSealing := &sealing.Sealing{ + Api: api, + DealInfo: &sealing.CurrentDealInfoManager{CDAPI: api}, + } + + sctx := mocks.NewMockContext(mockCtrl) + sctx.EXPECT().Context().AnyTimes().Return(ctx) + + api.EXPECT().ChainHead(ctx).Times(1).Return(nil, abi.ChainEpoch(10), nil) + + var dealId abi.DealID = 12 + dealProposal := market.DealProposal{ + PieceCID: idCid("newPieceCID"), + } + + api.EXPECT().StateMarketStorageDealProposal(ctx, dealId, nil).Return(dealProposal, nil) + + pc := idCid("publishCID") + + // expect GetCurrentDealInfo + { + api.EXPECT().StateSearchMsg(ctx, pc).Return(&sealing.MsgLookup{ + Receipt: sealing.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: cborRet(&market.PublishStorageDealsReturn{ + IDs: []abi.DealID{dealId}, + }), + }, + }, nil) + api.EXPECT().StateMarketStorageDeal(ctx, dealId, nil).Return(&api2.MarketDeal{ + Proposal: dealProposal, + }, nil) + + } + + sctx.EXPECT().Send(sealing.SectorRemove{}).Return(nil) + + err := fakeSealing.HandleRecoverDealIDs(sctx, sealing.SectorInfo{ + Pieces: []sealing.Piece{ + { + DealInfo: &api2.PieceDealInfo{ + DealID: dealId, + PublishCid: &pc, + }, + Piece: abi.PieceInfo{ + PieceCID: idCid("oldPieceCID"), + }, + }, + }, + }) + require.NoError(t, err) +} + +func idCid(str string) cid.Cid { + builder := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := builder.Sum([]byte(str)) + if err != nil { + panic(err) + } + return c +} + +func cborRet(v cbor.Marshaler) []byte { + var buf bytes.Buffer + if err := v.MarshalCBOR(&buf); err != nil { + panic(err) + } + return buf.Bytes() +} diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index 5e613b20b46..2deefa80f1d 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -23,7 +23,7 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid") } - mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg) if err != nil { return xerrors.Errorf("failed to wait for fault declaration: %w", err) } @@ -45,7 +45,7 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) // * Check for correct termination // * wait for expiration (+winning lookback?) - si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)}) } @@ -53,7 +53,7 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) if si == nil { // either already terminated or not committed yet - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)}) } @@ -81,7 +81,7 @@ func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInf return xerrors.New("entered TerminateWait with nil TerminateMessage") } - mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)}) } @@ -95,12 +95,12 @@ func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInf func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error { for { - tok, epoch, err := m.api.ChainHead(ctx.Context()) + tok, epoch, err := m.Api.ChainHead(ctx.Context()) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)}) } - nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)}) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 084cd565435..ed704d9dc4e 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -110,7 +110,7 @@ func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.C } func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) { - tok, epoch, err := m.api.ChainHead(ctx.Context()) + tok, epoch, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("getTicket: api error, not proceeding: %+v", err) return nil, 0, false, nil @@ -118,7 +118,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se // the reason why the StateMinerSectorAllocated function is placed here, if it is outside, // if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed - allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) + allocated, aerr := m.Api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) if aerr != nil { log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr) return nil, 0, false, nil @@ -130,7 +130,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se return nil, 0, allocated, err } - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err) } @@ -138,7 +138,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se if pci != nil { ticketEpoch = pci.Info.SealRandEpoch - nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) if err != nil { return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) } @@ -161,7 +161,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber) } - rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) + rand, err := m.Api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) if err != nil { return nil, 0, allocated, err } @@ -192,7 +192,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e } func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), m.maddr, sector, m.api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) @@ -207,14 +207,14 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - tok, height, err := m.api.ChainHead(ctx.Context()) + tok, height, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil } if checkTicketExpired(sector.TicketEpoch, height) { - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err) return nil @@ -224,7 +224,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorOldTicket{}) // go get new ticket } - nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) if err != nil { log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err) return nil @@ -282,13 +282,13 @@ func (m *Sealing) remarkForUpgrade(sid abi.SectorNumber) { } func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitInfo, big.Int, TipSetToken, error) { - tok, height, err := m.api.ChainHead(ctx.Context()) + tok, height, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) return nil, big.Zero(), nil, nil } - if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil { switch err := err.(type) { case *ErrApi: log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) @@ -320,7 +320,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)}) } - nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) if err != nil { return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)}) } @@ -356,7 +356,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( depositMinimum := m.tryUpgradeSector(ctx.Context(), params) - collateral, err := m.api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok) + collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok) if err != nil { return nil, big.Zero(), nil, xerrors.Errorf("getting initial pledge collateral: %w", err) } @@ -373,7 +373,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf } if cfg.BatchPreCommits { - nv, err := m.api.StateNetworkVersion(ctx.Context(), nil) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), nil) if err != nil { return xerrors.Errorf("getting network version: %w", err) } @@ -391,7 +391,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return nil // event was sent in preCommitParams } - deposit, err := collateralSendAmount(ctx.Context(), m.api, m.maddr, cfg, pcd) + deposit, err := collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, pcd) if err != nil { return err } @@ -401,7 +401,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)}) } - mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok) + mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok) if err != nil { log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) return nil @@ -415,7 +415,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf } log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit) - mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.PreCommitSector, deposit, big.Int(m.feeCfg.MaxPreCommitGasFee), enc.Bytes()) + mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.PreCommitSector, deposit, big.Int(m.feeCfg.MaxPreCommitGasFee), enc.Bytes()) if err != nil { if params.ReplaceCapacity { m.remarkForUpgrade(params.ReplaceSectorNumber) @@ -462,7 +462,7 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf // would be ideal to just use the events.Called handler, but it wouldn't be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorNumber) - mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) if err != nil { return ctx.Send(SectorChainPreCommitFailed{err}) } @@ -487,13 +487,13 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf } func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleWaitSeed: api error, not proceeding: %+v", err) return nil } - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } @@ -506,7 +506,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er err = m.events.ChainAt(func(ectx context.Context, _ TipSetToken, curH abi.ChainEpoch) error { // in case of null blocks the randomness can land after the tipset we // get from the events API - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil @@ -516,7 +516,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er if err := m.maddr.MarshalCBOR(buf); err != nil { return err } - rand, err := m.api.ChainGetRandomnessFromBeacon(ectx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes()) + rand, err := m.Api.ChainGetRandomnessFromBeacon(ectx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes()) if err != nil { err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, tok, err) @@ -543,7 +543,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) if sector.CommitMessage != nil { log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber) - ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + ml, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) if err != nil { log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err) } @@ -582,7 +582,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) } { - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil @@ -611,7 +611,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo } if cfg.AggregateCommits { - nv, err := m.api.StateNetworkVersion(ctx.Context(), nil) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), nil) if err != nil { return xerrors.Errorf("getting network version: %w", err) } @@ -621,7 +621,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo } } - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) return nil @@ -641,13 +641,13 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) } - mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok) + mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } @@ -655,7 +655,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")}) } - collateral, err := m.api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, tok) + collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, tok) if err != nil { return xerrors.Errorf("getting initial pledge collateral: %w", err) } @@ -665,7 +665,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo collateral = big.Zero() } - collateral, err = collateralSendAmount(ctx.Context(), m.api, m.maddr, cfg, collateral) + collateral, err = collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, collateral) if err != nil { return err } @@ -678,7 +678,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo } // TODO: check seed / ticket / deals are up to date - mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) + mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } @@ -709,7 +709,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S } if res.Error != "" { - tok, _, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) return nil @@ -739,7 +739,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")}) } - mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.CommitMessage) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) } @@ -756,7 +756,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } - si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) + si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)}) } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index c5aed505a65..aeb378f29d9 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -17,6 +17,14 @@ import ( "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/statemachine.go -package=mocks . Context + +// Context is a go-statemachine context +type Context interface { + Context() context.Context + Send(evt interface{}) error +} + // Piece is a tuple of piece and deal info type PieceWithDealInfo struct { Piece abi.PieceInfo diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index 78a78fc45c5..02db41fdeff 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -57,7 +57,7 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC } replace := m.maybeUpgradableSector() if replace != nil { - loc, err := m.api.StateSectorPartition(ctx, m.maddr, *replace, nil) + loc, err := m.Api.StateSectorPartition(ctx, m.maddr, *replace, nil) if err != nil { log.Errorf("error calling StateSectorPartition for replaced sector: %+v", err) return big.Zero() @@ -70,7 +70,7 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC log.Infof("replacing sector %d with %d", *replace, params.SectorNumber) - ri, err := m.api.StateSectorGetInfo(ctx, m.maddr, *replace, nil) + ri, err := m.Api.StateSectorGetInfo(ctx, m.maddr, *replace, nil) if err != nil { log.Errorf("error calling StateSectorGetInfo for replaced sector: %+v", err) return big.Zero()