diff --git a/storage/adapter_events.go b/storage/adapter_events.go deleted file mode 100644 index a229ff535b5..00000000000 --- a/storage/adapter_events.go +++ /dev/null @@ -1,29 +0,0 @@ -package storage - -import ( - "context" - - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/types" - sealing "github.com/filecoin-project/lotus/storage/pipeline" -) - -var _ sealing.Events = new(EventsAdapter) - -type EventsAdapter struct { - delegate *events.Events -} - -func NewEventsAdapter(api *events.Events) EventsAdapter { - return EventsAdapter{delegate: api} -} - -func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error { - return e.delegate.ChainAt(context.TODO(), func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { - return hnd(ctx, ts.Key(), curH) - }, func(ctx context.Context, ts *types.TipSet) error { - return rev(ctx, ts.Key()) - }, confidence, h) -} diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go deleted file mode 100644 index 3c2613b8b4e..00000000000 --- a/storage/adapter_storage_miner.go +++ /dev/null @@ -1,123 +0,0 @@ -package storage - -import ( - "context" - - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - minertypes "github.com/filecoin-project/go-state-types/builtin/v8/miner" - "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/go-state-types/dline" - "github.com/filecoin-project/go-state-types/network" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/types" - pipeline "github.com/filecoin-project/lotus/storage/pipeline" -) - -var _ pipeline.SealingAPI = new(SealingAPIAdapter) - -type SealingAPIAdapter struct { - delegate fullNodeFilteredAPI -} - -func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter { - return SealingAPIAdapter{delegate: api} -} - -func (s SealingAPIAdapter) StateMinerPreCommitDepositForPower(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tsk types.TipSetKey) (big.Int, error) { - return s.delegate.StateMinerPreCommitDepositForPower(ctx, a, pci, tsk) -} - -func (s SealingAPIAdapter) StateMinerInitialPledgeCollateral(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tsk types.TipSetKey) (big.Int, error) { - return s.delegate.StateMinerInitialPledgeCollateral(ctx, a, pci, tsk) -} - -func (s SealingAPIAdapter) StateMinerInfo(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (api.MinerInfo, error) { - return s.delegate.StateMinerInfo(ctx, maddr, tsk) -} - -func (s SealingAPIAdapter) StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (big.Int, error) { - return s.delegate.StateMinerAvailableBalance(ctx, maddr, tsk) -} - -func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr address.Address, tsk types.TipSetKey) ([]api.Deadline, error) { - return s.delegate.StateMinerDeadlines(ctx, maddr, tsk) -} - -func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr address.Address, sid abi.SectorNumber, tsk types.TipSetKey) (bool, error) { - return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk) -} - -func (s SealingAPIAdapter) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) { - return s.delegate.StateSectorGetInfo(ctx, maddr, sectorNumber, tsk) -} - -func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error) { - return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk) -} - -func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { - return s.delegate.StateLookupID(ctx, addr, tsk) -} - -func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) { - return s.delegate.StateMarketStorageDeal(ctx, dealID, tsk) -} - -func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) { - return s.delegate.StateNetworkVersion(ctx, tsk) -} - -func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (*dline.Info, error) { - return s.delegate.StateMinerProvingDeadline(ctx, maddr, tsk) -} - -func (s SealingAPIAdapter) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) { - return s.delegate.ChainGetMessage(ctx, mc) -} - -func (s SealingAPIAdapter) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) { - return s.delegate.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk) -} - -func (s SealingAPIAdapter) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) { - return s.delegate.StateGetRandomnessFromTickets(ctx, personalization, randEpoch, entropy, tsk) -} - -func (s SealingAPIAdapter) ChainReadObj(ctx context.Context, ocid cid.Cid) ([]byte, error) { - return s.delegate.ChainReadObj(ctx, ocid) -} - -func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { - return s.delegate.StateWaitMsg(ctx, cid, confidence, limit, allowReplaced) - -} - -func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { - return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced) -} - -func (s SealingAPIAdapter) ChainHead(ctx context.Context) (*types.TipSet, error) { - return s.delegate.ChainHead(ctx) -} - -func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) { - return s.delegate.StateComputeDataCID(ctx, maddr, sectorType, deals, tsk) -} - -func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorLocation, error) { - return s.delegate.StateSectorPartition(ctx, maddr, sectorNumber, tsk) -} - -func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*minertypes.SectorPreCommitOnChainInfo, error) { - return s.delegate.StateSectorPreCommitInfo(ctx, maddr, sectorNumber, tsk) -} - -func (s SealingAPIAdapter) MpoolPushMessage(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec) (*types.SignedMessage, error) { - return s.delegate.MpoolPushMessage(ctx, msg, mss) -} diff --git a/storage/miner.go b/storage/miner.go index 280e62097d0..8f627a45bd4 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -177,19 +177,12 @@ func (m *Miner) Run(ctx context.Context) error { if err != nil { return xerrors.Errorf("failed to subscribe to events: %w", err) } - evtsAdapter := NewEventsAdapter(evts) - - // Create a shim to glue the API required by the sealing component - // with the API that Lotus is capable of providing. - // The shim translates between "tipset tokens" and tipset keys, and - // provides extra methods. - adaptedAPI := NewSealingAPIAdapter(m.api) // Instantiate a precommit policy. cfg := pipeline.GetSealingConfigFunc(m.getSealConfig) provingBuffer := md.WPoStProvingPeriod * 2 - pcp := pipeline.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer) + pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer) // address selector. as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { @@ -197,7 +190,7 @@ func (m *Miner) Run(ctx context.Context) error { } // Instantiate the sealing FSM. - m.sealing = pipeline.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as) + m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as) // Run the sealing FSM. go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function diff --git a/storage/pipeline/checks.go b/storage/pipeline/checks.go index 0da31b90f03..c03bab5d1b3 100644 --- a/storage/pipeline/checks.go +++ b/storage/pipeline/checks.go @@ -96,7 +96,7 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t return err } - commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok) + commD, err := api.StateComputeDataCID(ctx, maddr, si.SectorType, si.dealIDs(), tok) if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} } @@ -220,7 +220,7 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf return xerrors.Errorf("replica update on sector not marked for update") } - commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok) + commD, err := api.StateComputeDataCID(ctx, maddr, si.SectorType, si.dealIDs(), tok) if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} } diff --git a/storage/pipeline/events.go b/storage/pipeline/events.go deleted file mode 100644 index c5c5cebd170..00000000000 --- a/storage/pipeline/events.go +++ /dev/null @@ -1,17 +0,0 @@ -package sealing - -import ( - "context" - - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/lotus/chain/types" -) - -// `curH`-`ts.Height` = `confidence` -type HeightHandler func(ctx context.Context, tok types.TipSetKey, curH abi.ChainEpoch) error -type RevertHandler func(ctx context.Context, tok types.TipSetKey) error - -type Events interface { - ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error -} diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index 694bf4f2c33..51f319f2a50 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -107,19 +107,19 @@ func (mr *MockSealingAPIMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2) } -// StateComputeDataCommitment mocks base method. -func (m *MockSealingAPI) StateComputeDataCommitment(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) { +// StateComputeDataCID mocks base method. +func (m *MockSealingAPI) StateComputeDataCID(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StateComputeDataCommitment", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "StateComputeDataCID", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(cid.Cid) ret1, _ := ret[1].(error) return ret0, ret1 } -// StateComputeDataCommitment indicates an expected call of StateComputeDataCommitment. -func (mr *MockSealingAPIMockRecorder) StateComputeDataCommitment(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +// StateComputeDataCID indicates an expected call of StateComputeDataCID. +func (mr *MockSealingAPIMockRecorder) StateComputeDataCID(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCommitment", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCommitment), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCID", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCID), arg0, arg1, arg2, arg3, arg4) } // StateGetRandomnessFromBeacon mocks base method. diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index fac59d95662..f4cfce1555d 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/lotus/api" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" @@ -41,10 +42,9 @@ var log = logging.Logger("sectors") type SealingAPI interface { StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) - StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error) - // Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) + StateComputeDataCID(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorOnChainInfo, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) @@ -70,6 +70,10 @@ type SectorStateNotifee func(before, after SectorInfo) type AddrSel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) +type Events interface { + ChainAt(ctx context.Context, hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error +} + type Sealing struct { Api SealingAPI DealInfo *CurrentDealInfoManager diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 700ccec29a9..499048acbe2 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -255,9 +255,9 @@ func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector Sector targetHeight := mw.Height + lb + InteractivePoRepConfidence - return m.events.ChainAt(func(context.Context, types.TipSetKey, abi.ChainEpoch) error { + return m.events.ChainAt(context.Background(), func(context.Context, *types.TipSet, abi.ChainEpoch) error { return ctx.Send(SectorUpdateActive{}) - }, func(ctx context.Context, ts types.TipSetKey) error { + }, func(ctx context.Context, ts *types.TipSet) error { log.Warn("revert in handleUpdateActivating") return nil }, InteractivePoRepConfidence, targetHeight) diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 6d272e63a04..e5a35ee3d02 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -498,7 +498,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() - err = m.events.ChainAt(func(ectx context.Context, _ types.TipSetKey, curH abi.ChainEpoch) error { + err = m.events.ChainAt(context.Background(), func(ectx context.Context, _ *types.TipSet, curH abi.ChainEpoch) error { // in case of null blocks the randomness can land after the tipset we // get from the events API ts, err := m.Api.ChainHead(ctx.Context()) @@ -522,7 +522,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er _ = ctx.Send(SectorSeedReady{SeedValue: abi.InteractiveSealRandomness(rand), SeedEpoch: randHeight}) return nil - }, func(ctx context.Context, ts types.TipSetKey) error { + }, func(ctx context.Context, ts *types.TipSet) error { log.Warn("revert in interactive commit sector step") // TODO: need to cancel running process and restart... return nil