diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 5bc899afd72..a28a837677a 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -27,10 +27,10 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" ) -var _ sealing2.SealingAPI = new(SealingAPIAdapter) +var _ pipeline.SealingAPI = new(SealingAPIAdapter) type SealingAPIAdapter struct { delegate fullNodeFilteredAPI @@ -40,7 +40,7 @@ func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter { return SealingAPIAdapter{delegate: api} } -func (s SealingAPIAdapter) StateMinerSectorSize(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (abi.SectorSize, error) { +func (s SealingAPIAdapter) StateMinerSectorSize(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (abi.SectorSize, error) { // TODO: update storage-fsm to just StateMinerInfo mi, err := s.StateMinerInfo(ctx, maddr, tok) if err != nil { @@ -49,7 +49,7 @@ func (s SealingAPIAdapter) StateMinerSectorSize(ctx context.Context, maddr addre return mi.SectorSize, nil } -func (s SealingAPIAdapter) StateMinerPreCommitDepositForPower(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tok sealing2.TipSetToken) (big.Int, error) { +func (s SealingAPIAdapter) StateMinerPreCommitDepositForPower(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tok pipeline.TipSetToken) (big.Int, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return big.Zero(), xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -58,7 +58,7 @@ func (s SealingAPIAdapter) StateMinerPreCommitDepositForPower(ctx context.Contex return s.delegate.StateMinerPreCommitDepositForPower(ctx, a, pci, tsk) } -func (s SealingAPIAdapter) StateMinerInitialPledgeCollateral(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tok sealing2.TipSetToken) (big.Int, error) { +func (s SealingAPIAdapter) StateMinerInitialPledgeCollateral(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tok pipeline.TipSetToken) (big.Int, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return big.Zero(), xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -67,7 +67,7 @@ func (s SealingAPIAdapter) StateMinerInitialPledgeCollateral(ctx context.Context return s.delegate.StateMinerInitialPledgeCollateral(ctx, a, pci, tsk) } -func (s SealingAPIAdapter) StateMinerInfo(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (api.MinerInfo, error) { +func (s SealingAPIAdapter) StateMinerInfo(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (api.MinerInfo, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return api.MinerInfo{}, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -77,7 +77,7 @@ func (s SealingAPIAdapter) StateMinerInfo(ctx context.Context, maddr address.Add return s.delegate.StateMinerInfo(ctx, maddr, tsk) } -func (s SealingAPIAdapter) StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (big.Int, error) { +func (s SealingAPIAdapter) StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (big.Int, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return big.Zero(), xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -86,7 +86,7 @@ func (s SealingAPIAdapter) StateMinerAvailableBalance(ctx context.Context, maddr return s.delegate.StateMinerAvailableBalance(ctx, maddr, tsk) } -func (s SealingAPIAdapter) StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (address.Address, error) { +func (s SealingAPIAdapter) StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (address.Address, error) { // TODO: update storage-fsm to just StateMinerInfo mi, err := s.StateMinerInfo(ctx, maddr, tok) if err != nil { @@ -95,7 +95,7 @@ func (s SealingAPIAdapter) StateMinerWorkerAddress(ctx context.Context, maddr ad return mi.Worker, nil } -func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) ([]api.Deadline, error) { +func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) ([]api.Deadline, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -104,7 +104,7 @@ func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr addres return s.delegate.StateMinerDeadlines(ctx, maddr, tsk) } -func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr address.Address, sid abi.SectorNumber, tok sealing2.TipSetToken) (bool, error) { +func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr address.Address, sid abi.SectorNumber, tok pipeline.TipSetToken) (bool, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return false, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -113,7 +113,7 @@ func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk) } -func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (bitfield.BitField, error) { +func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (bitfield.BitField, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -134,14 +134,14 @@ func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr ad return miner.AllPartSectors(state, miner.Partition.ActiveSectors) } -func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing2.MsgLookup, error) { +func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (pipeline.MsgLookup, error) { wmsg, err := s.delegate.StateWaitMsg(ctx, mcid, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { - return sealing2.MsgLookup{}, err + return pipeline.MsgLookup{}, err } - return sealing2.MsgLookup{ - Receipt: sealing2.MessageReceipt{ + return pipeline.MsgLookup{ + Receipt: pipeline.MessageReceipt{ ExitCode: wmsg.Receipt.ExitCode, Return: wmsg.Receipt.Return, GasUsed: wmsg.Receipt.GasUsed, @@ -151,7 +151,7 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal }, nil } -func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing2.MsgLookup, error) { +func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*pipeline.MsgLookup, error) { wmsg, err := s.delegate.StateSearchMsg(ctx, types.EmptyTSK, c, api.LookbackNoLimit, true) if err != nil { return nil, err @@ -161,8 +161,8 @@ func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*seal return nil, nil } - return &sealing2.MsgLookup{ - Receipt: sealing2.MessageReceipt{ + return &pipeline.MsgLookup{ + Receipt: pipeline.MessageReceipt{ ExitCode: wmsg.Receipt.ExitCode, Return: wmsg.Receipt.Return, GasUsed: wmsg.Receipt.GasUsed, @@ -172,7 +172,7 @@ func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*seal }, nil } -func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing2.TipSetToken) (cid.Cid, error) { +func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok pipeline.TipSetToken) (cid.Cid, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return cid.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -240,7 +240,7 @@ func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr return cid.Cid(cr.CommDs[0]), nil } -func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok sealing2.TipSetToken) (*minertypes.SectorPreCommitOnChainInfo, error) { +func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok pipeline.TipSetToken) (*minertypes.SectorPreCommitOnChainInfo, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -268,7 +268,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a return nil, xerrors.Errorf("checking if sector is allocated: %w", err) } if set { - return nil, sealing2.ErrSectorAllocated + return nil, pipeline.ErrSectorAllocated } return nil, nil @@ -277,7 +277,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a return pci, nil } -func (s SealingAPIAdapter) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok sealing2.TipSetToken) (*miner.SectorOnChainInfo, error) { +func (s SealingAPIAdapter) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok pipeline.TipSetToken) (*miner.SectorOnChainInfo, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -286,7 +286,7 @@ func (s SealingAPIAdapter) StateSectorGetInfo(ctx context.Context, maddr address return s.delegate.StateSectorGetInfo(ctx, maddr, sectorNumber, tsk) } -func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok sealing2.TipSetToken) (*sealing2.SectorLocation, error) { +func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok pipeline.TipSetToken) (*pipeline.SectorLocation, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -297,7 +297,7 @@ func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr addre return nil, err } if l != nil { - return &sealing2.SectorLocation{ + return &pipeline.SectorLocation{ Deadline: l.Deadline, Partition: l.Partition, }, nil @@ -306,7 +306,7 @@ func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr addre return nil, nil // not found } -func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tok sealing2.TipSetToken) ([]api.Partition, error) { +func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tok pipeline.TipSetToken) ([]api.Partition, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) @@ -315,7 +315,7 @@ func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr addre return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk) } -func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Address, tok sealing2.TipSetToken) (address.Address, error) { +func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Address, tok pipeline.TipSetToken) (address.Address, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return address.Undef, err @@ -324,7 +324,7 @@ func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Addre return s.delegate.StateLookupID(ctx, addr, tsk) } -func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing2.TipSetToken) (*api.MarketDeal, error) { +func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok pipeline.TipSetToken) (*api.MarketDeal, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, err @@ -333,7 +333,7 @@ func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID ab return s.delegate.StateMarketStorageDeal(ctx, dealID, tsk) } -func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, dealID abi.DealID, tok sealing2.TipSetToken) (market.DealProposal, error) { +func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, dealID abi.DealID, tok pipeline.TipSetToken) (market.DealProposal, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return market.DealProposal{}, err @@ -347,7 +347,7 @@ func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, d return deal.Proposal, nil } -func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok sealing2.TipSetToken) (network.Version, error) { +func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok pipeline.TipSetToken) (network.Version, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return network.VersionMax, err @@ -356,7 +356,7 @@ func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok sealing2 return s.delegate.StateNetworkVersion(ctx, tsk) } -func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tok sealing2.TipSetToken) (*dline.Info, error) { +func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tok pipeline.TipSetToken) (*dline.Info, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, err @@ -382,7 +382,7 @@ func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address return smsg.Cid(), nil } -func (s SealingAPIAdapter) ChainHead(ctx context.Context) (sealing2.TipSetToken, abi.ChainEpoch, error) { +func (s SealingAPIAdapter) ChainHead(ctx context.Context) (pipeline.TipSetToken, abi.ChainEpoch, error) { head, err := s.delegate.ChainHead(ctx) if err != nil { return nil, 0, err @@ -391,7 +391,7 @@ func (s SealingAPIAdapter) ChainHead(ctx context.Context) (sealing2.TipSetToken, return head.Key().Bytes(), head.Height(), nil } -func (s SealingAPIAdapter) ChainBaseFee(ctx context.Context, tok sealing2.TipSetToken) (abi.TokenAmount, error) { +func (s SealingAPIAdapter) ChainBaseFee(ctx context.Context, tok pipeline.TipSetToken) (abi.TokenAmount, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return big.Zero(), err @@ -409,7 +409,7 @@ func (s SealingAPIAdapter) ChainGetMessage(ctx context.Context, mc cid.Cid) (*ty return s.delegate.ChainGetMessage(ctx, mc) } -func (s SealingAPIAdapter) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok sealing2.TipSetToken) (abi.Randomness, error) { +func (s SealingAPIAdapter) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok pipeline.TipSetToken) (abi.Randomness, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, err @@ -418,7 +418,7 @@ func (s SealingAPIAdapter) StateGetRandomnessFromBeacon(ctx context.Context, per return s.delegate.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk) } -func (s SealingAPIAdapter) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok sealing2.TipSetToken) (abi.Randomness, error) { +func (s SealingAPIAdapter) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok pipeline.TipSetToken) (abi.Randomness, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { return nil, err diff --git a/storage/miner.go b/storage/miner.go index bb02443d574..6fd076d18a7 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -32,7 +32,7 @@ import ( "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/ctladdr" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" ) @@ -52,7 +52,7 @@ type Miner struct { feeCfg config.MinerFeeConfig sealer sealer.SectorManager ds datastore.Batching - sc sealing2.SectorIDCounter + sc pipeline.SectorIDCounter verif ffiwrapper.Verifier prover ffiwrapper.Prover addrSel *ctladdr.AddressSelector @@ -60,7 +60,7 @@ type Miner struct { maddr address.Address getSealConfig dtypes.GetSealingConfigFunc - sealing *sealing2.Sealing + sealing *pipeline.Sealing sealingEvtType journal.EventType @@ -71,8 +71,8 @@ type Miner struct { type SealingStateEvt struct { SectorNumber abi.SectorNumber SectorType abi.RegisteredSealProof - From sealing2.SectorState - After sealing2.SectorState + From pipeline.SectorState + After pipeline.SectorState Error string } @@ -134,7 +134,7 @@ func NewMiner(api fullNodeFilteredAPI, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, - sc sealing2.SectorIDCounter, + sc pipeline.SectorIDCounter, verif ffiwrapper.Verifier, prover ffiwrapper.Prover, gsd dtypes.GetSealingConfigFunc, @@ -185,10 +185,10 @@ func (m *Miner) Run(ctx context.Context) error { adaptedAPI := NewSealingAPIAdapter(m.api) // Instantiate a precommit policy. - cfg := sealing2.GetSealingConfigFunc(m.getSealConfig) + cfg := pipeline.GetSealingConfigFunc(m.getSealConfig) provingBuffer := md.WPoStProvingPeriod * 2 - pcp := sealing2.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer) + pcp := pipeline.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer) // address selector. as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { @@ -196,7 +196,7 @@ func (m *Miner) Run(ctx context.Context) error { } // Instantiate the sealing FSM. - m.sealing = sealing2.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as) + m.sealing = pipeline.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as) // Run the sealing FSM. go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function @@ -204,7 +204,7 @@ func (m *Miner) Run(ctx context.Context) error { return nil } -func (m *Miner) handleSealingNotifications(before, after sealing2.SectorInfo) { +func (m *Miner) handleSealingNotifications(before, after pipeline.SectorInfo) { m.journal.RecordEvent(m.sealingEvtType, func() interface{} { return SealingStateEvt{ SectorNumber: before.SectorNumber, diff --git a/storage/miner_sealing.go b/storage/miner_sealing.go index 4f92b91ec39..04f11282310 100644 --- a/storage/miner_sealing.go +++ b/storage/miner_sealing.go @@ -12,7 +12,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -27,7 +27,7 @@ func (m *Miner) StartPackingSector(sectorNum abi.SectorNumber) error { return m.sealing.StartPacking(sectorNum) } -func (m *Miner) ListSectors() ([]sealing2.SectorInfo, error) { +func (m *Miner) ListSectors() ([]pipeline.SectorInfo, error) { return m.sealing.ListSectors() } @@ -35,7 +35,7 @@ func (m *Miner) PledgeSector(ctx context.Context) (storage.SectorRef, error) { return m.sealing.PledgeSector(ctx) } -func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing2.SectorState) error { +func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state pipeline.SectorState) error { return m.sealing.ForceSectorState(ctx, id, state) } diff --git a/storage/paths/http_handler_test.go b/storage/paths/http_handler_test.go index 262ac14c024..0d81dc1ef3a 100644 --- a/storage/paths/http_handler_test.go +++ b/storage/paths/http_handler_test.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/storage/paths" - mocks2 "github.com/filecoin-project/lotus/storage/paths/mocks" + mocks "github.com/filecoin-project/lotus/storage/paths/mocks" "github.com/filecoin-project/lotus/storage/sealer/partialfile" storiface "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -64,8 +64,8 @@ func TestRemoteGetAllocated(t *testing.T) { tcs := map[string]struct { piFnc func(pi *pieceInfo) - storeFnc func(s *mocks2.MockStore) - pfFunc func(s *mocks2.MockPartialFileHandler) + storeFnc func(s *mocks.MockStore) + pfFunc func(s *mocks.MockPartialFileHandler) // expectation expectedStatusCode int @@ -102,7 +102,7 @@ func TestRemoteGetAllocated(t *testing.T) { }, "fails when errors out during acquiring unsealed sector file": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -113,7 +113,7 @@ func TestRemoteGetAllocated(t *testing.T) { }, "fails when unsealed sector file is not found locally": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{}, @@ -122,7 +122,7 @@ func TestRemoteGetAllocated(t *testing.T) { }, "fails when error while opening partial file": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -131,7 +131,7 @@ func TestRemoteGetAllocated(t *testing.T) { storiface.SectorPaths{}, nil).Times(1) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{}, xerrors.New("some error")).Times(1) }, @@ -139,7 +139,7 @@ func TestRemoteGetAllocated(t *testing.T) { "fails when determining partial file allocation returns an error": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -148,7 +148,7 @@ func TestRemoteGetAllocated(t *testing.T) { storiface.SectorPaths{}, nil).Times(1) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile, nil).Times(1) @@ -158,7 +158,7 @@ func TestRemoteGetAllocated(t *testing.T) { }, "StatusRequestedRangeNotSatisfiable when piece is NOT allocated in partial file": { expectedStatusCode: http.StatusRequestedRangeNotSatisfiable, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -167,7 +167,7 @@ func TestRemoteGetAllocated(t *testing.T) { storiface.SectorPaths{}, nil).Times(1) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile, nil).Times(1) @@ -177,7 +177,7 @@ func TestRemoteGetAllocated(t *testing.T) { }, "OK when piece is allocated in partial file": { expectedStatusCode: http.StatusOK, - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -186,7 +186,7 @@ func TestRemoteGetAllocated(t *testing.T) { storiface.SectorPaths{}, nil).Times(1) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile, nil).Times(1) @@ -204,8 +204,8 @@ func TestRemoteGetAllocated(t *testing.T) { // when test is done, assert expectations on all mock objects. defer mockCtrl.Finish() - lstore := mocks2.NewMockStore(mockCtrl) - pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl) + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) handler := &paths.FetchHandler{ lstore, @@ -274,7 +274,7 @@ func TestRemoteGetSector(t *testing.T) { tcs := map[string]struct { siFnc func(pi *sectorInfo) - storeFnc func(s *mocks2.MockStore, path string) + storeFnc func(s *mocks.MockStore, path string) // reading a file or a dir isDir bool @@ -300,7 +300,7 @@ func TestRemoteGetSector(t *testing.T) { noResponseBytes: true, }, "fails when error while acquiring sector file": { - storeFnc: func(l *mocks2.MockStore, _ string) { + storeFnc: func(l *mocks.MockStore, _ string) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -313,7 +313,7 @@ func TestRemoteGetSector(t *testing.T) { }, "fails when acquired sector file path is empty": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore, _ string) { + storeFnc: func(l *mocks.MockStore, _ string) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{}, @@ -323,7 +323,7 @@ func TestRemoteGetSector(t *testing.T) { }, "fails when acquired file does not exist": { expectedStatusCode: http.StatusInternalServerError, - storeFnc: func(l *mocks2.MockStore, _ string) { + storeFnc: func(l *mocks.MockStore, _ string) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -334,7 +334,7 @@ func TestRemoteGetSector(t *testing.T) { noResponseBytes: true, }, "successfully read a sector file": { - storeFnc: func(l *mocks2.MockStore, path string) { + storeFnc: func(l *mocks.MockStore, path string) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -349,7 +349,7 @@ func TestRemoteGetSector(t *testing.T) { expectedResponseBytes: fileBytes, }, "successfully read a sector dir": { - storeFnc: func(l *mocks2.MockStore, path string) { + storeFnc: func(l *mocks.MockStore, path string) { l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -372,8 +372,8 @@ func TestRemoteGetSector(t *testing.T) { mockCtrl := gomock.NewController(t) // when test is done, assert expectations on all mock objects. defer mockCtrl.Finish() - lstore := mocks2.NewMockStore(mockCtrl) - pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl) + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) var path string diff --git a/storage/paths/mocks/index.go b/storage/paths/mocks/index.go index 696d7e9742c..030692b8f5f 100644 --- a/storage/paths/mocks/index.go +++ b/storage/paths/mocks/index.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/filecoin-project/lotus/storage/sealer/stores (interfaces: SectorIndex) +// Source: github.com/filecoin-project/lotus/storage/paths (interfaces: SectorIndex) // Package mocks is a generated GoMock package. package mocks diff --git a/storage/paths/mocks/pf.go b/storage/paths/mocks/pf.go index 360443670b5..50b020aaa8f 100644 --- a/storage/paths/mocks/pf.go +++ b/storage/paths/mocks/pf.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/filecoin-project/lotus/storage/sealer/stores (interfaces: PartialFileHandler) +// Source: github.com/filecoin-project/lotus/storage/paths (interfaces: PartialFileHandler) // Package mocks is a generated GoMock package. package mocks diff --git a/storage/paths/mocks/store.go b/storage/paths/mocks/store.go index 1906319b56c..b1238c479b1 100644 --- a/storage/paths/mocks/store.go +++ b/storage/paths/mocks/store.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/filecoin-project/lotus/storage/sealer/stores (interfaces: Store) +// Source: github.com/filecoin-project/lotus/storage/paths (interfaces: Store) // Package mocks is a generated GoMock package. package mocks diff --git a/storage/paths/remote_test.go b/storage/paths/remote_test.go index 0199ca22a9f..2cf06fcb535 100644 --- a/storage/paths/remote_test.go +++ b/storage/paths/remote_test.go @@ -25,7 +25,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" - mocks2 "github.com/filecoin-project/lotus/storage/paths/mocks" + mocks "github.com/filecoin-project/lotus/storage/paths/mocks" "github.com/filecoin-project/lotus/storage/sealer/partialfile" storiface "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -175,9 +175,9 @@ func TestReader(t *testing.T) { ctx := context.Background() tcs := map[string]struct { - storeFnc func(s *mocks2.MockStore) - pfFunc func(s *mocks2.MockPartialFileHandler) - indexFnc func(s *mocks2.MockSectorIndex, serverURL string) + storeFnc func(s *mocks.MockStore) + pfFunc func(s *mocks.MockPartialFileHandler) + indexFnc func(s *mocks.MockSectorIndex, serverURL string) needHttpServer bool @@ -194,7 +194,7 @@ func TestReader(t *testing.T) { // -------- have the unsealed file locally "fails when error while acquiring unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error")) }, @@ -202,22 +202,22 @@ func TestReader(t *testing.T) { }, "fails when error while opening local partial (unsealed) file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error")) }, errStr: "pf open error", }, "fails when error while checking if local unsealed file has piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, true, xerrors.New("piece check error")) @@ -227,11 +227,11 @@ func TestReader(t *testing.T) { }, "fails when error while closing local unsealed file that does not have the piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, false, nil) @@ -241,11 +241,11 @@ func TestReader(t *testing.T) { }, "fails when error while fetching reader for the local unsealed file that has the unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, true, nil) @@ -258,11 +258,11 @@ func TestReader(t *testing.T) { // ------------------- don't have the unsealed file locally "fails when error while finding sector": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, _ string) { + indexFnc: func(in *mocks.MockSectorIndex, _ string) { in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), false).Return(nil, xerrors.New("find sector error")) }, @@ -270,11 +270,11 @@ func TestReader(t *testing.T) { }, "fails when no worker has unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, _ string) { + indexFnc: func(in *mocks.MockSectorIndex, _ string) { in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), false).Return(nil, nil) }, @@ -283,11 +283,11 @@ func TestReader(t *testing.T) { // --- nil reader when local unsealed file does NOT have unsealed piece "nil reader when local unsealed file does not have the unsealed piece and remote sector also dosen't have the unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, false, nil) @@ -296,7 +296,7 @@ func TestReader(t *testing.T) { }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -311,11 +311,11 @@ func TestReader(t *testing.T) { // ---- nil reader when none of the remote unsealed file has unsealed piece "nil reader when none of the worker has the unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -329,11 +329,11 @@ func TestReader(t *testing.T) { }, "nil reader when none of the worker is able to serve the unsealed piece even though they have it": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -349,11 +349,11 @@ func TestReader(t *testing.T) { // ---- Success for local unsealed file "successfully fetches reader for piece from local unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, true, nil) @@ -377,11 +377,11 @@ func TestReader(t *testing.T) { // --- Success for remote unsealed file // --- Success for remote unsealed file "successfully fetches reader from remote unsealed piece when local unsealed file does NOT have the unsealed Piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, false, nil) @@ -390,7 +390,7 @@ func TestReader(t *testing.T) { }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -407,11 +407,11 @@ func TestReader(t *testing.T) { }, "successfully fetches reader for piece from remote unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -437,9 +437,9 @@ func TestReader(t *testing.T) { defer mockCtrl.Finish() // create them mocks - lstore := mocks2.NewMockStore(mockCtrl) - pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl) - index := mocks2.NewMockSectorIndex(mockCtrl) + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) + index := mocks.NewMockSectorIndex(mockCtrl) if tc.storeFnc != nil { tc.storeFnc(lstore) @@ -533,9 +533,9 @@ func TestCheckIsUnsealed(t *testing.T) { ctx := context.Background() tcs := map[string]struct { - storeFnc func(s *mocks2.MockStore) - pfFunc func(s *mocks2.MockPartialFileHandler) - indexFnc func(s *mocks2.MockSectorIndex, serverURL string) + storeFnc func(s *mocks.MockStore) + pfFunc func(s *mocks.MockPartialFileHandler) + indexFnc func(s *mocks.MockSectorIndex, serverURL string) needHttpServer bool @@ -550,7 +550,7 @@ func TestCheckIsUnsealed(t *testing.T) { // -------- have the unsealed file locally "fails when error while acquiring unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error")) }, @@ -558,22 +558,22 @@ func TestCheckIsUnsealed(t *testing.T) { }, "fails when error while opening local partial (unsealed) file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error")) }, errStr: "pf open error", }, "fails when error while checking if local unsealed file has piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, true, xerrors.New("piece check error")) @@ -583,11 +583,11 @@ func TestCheckIsUnsealed(t *testing.T) { }, "fails when error while closing local unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, @@ -601,11 +601,11 @@ func TestCheckIsUnsealed(t *testing.T) { // ------------------- don't have the unsealed file locally "fails when error while finding sector": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, _ string) { + indexFnc: func(in *mocks.MockSectorIndex, _ string) { in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), false).Return(nil, xerrors.New("find sector error")) }, @@ -613,11 +613,11 @@ func TestCheckIsUnsealed(t *testing.T) { }, "false when no worker has unsealed file": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, _ string) { + indexFnc: func(in *mocks.MockSectorIndex, _ string) { in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), false).Return(nil, nil) }, @@ -625,11 +625,11 @@ func TestCheckIsUnsealed(t *testing.T) { // false when local unsealed file does NOT have unsealed piece "false when local unsealed file does not have the piece and remote sector too dosen't have the piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, false, nil) @@ -637,7 +637,7 @@ func TestCheckIsUnsealed(t *testing.T) { pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -651,11 +651,11 @@ func TestCheckIsUnsealed(t *testing.T) { }, "false when none of the worker has the unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -670,11 +670,11 @@ func TestCheckIsUnsealed(t *testing.T) { // ---- Success for local unsealed file "true when local unsealed file has the piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, true, nil) @@ -687,11 +687,11 @@ func TestCheckIsUnsealed(t *testing.T) { // --- Success for remote unsealed file "true if we have a remote unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -706,11 +706,11 @@ func TestCheckIsUnsealed(t *testing.T) { }, "true when local unsealed file does NOT have the unsealed Piece but remote sector has the unsealed piece": { - storeFnc: func(l *mocks2.MockStore) { + storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, - pfFunc: func(pf *mocks2.MockPartialFileHandler) { + pfFunc: func(pf *mocks.MockPartialFileHandler) { mockPartialFileOpen(pf, sectorSize, pfPath, nil) mockCheckAllocation(pf, offset, size, emptyPartialFile, false, nil) @@ -718,7 +718,7 @@ func TestCheckIsUnsealed(t *testing.T) { pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) }, - indexFnc: func(in *mocks2.MockSectorIndex, url string) { + indexFnc: func(in *mocks.MockSectorIndex, url string) { si := storiface.SectorStorageInfo{ URLs: []string{url}, } @@ -742,9 +742,9 @@ func TestCheckIsUnsealed(t *testing.T) { defer mockCtrl.Finish() // create them mocks - lstore := mocks2.NewMockStore(mockCtrl) - pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl) - index := mocks2.NewMockSectorIndex(mockCtrl) + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) + index := mocks.NewMockSectorIndex(mockCtrl) if tc.storeFnc != nil { tc.storeFnc(lstore) @@ -789,7 +789,7 @@ func TestCheckIsUnsealed(t *testing.T) { } } -func mockSectorAcquire(l *mocks2.MockStore, sectorRef storage.SectorRef, pfPath string, err error) { +func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) { l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ Unsealed: pfPath, @@ -797,18 +797,18 @@ func mockSectorAcquire(l *mocks2.MockStore, sectorRef storage.SectorRef, pfPath storiface.SectorPaths{}, err).Times(1) } -func mockPartialFileOpen(pf *mocks2.MockPartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) { +func mockPartialFileOpen(pf *mocks.MockPartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) { pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{}, err).Times(1) } -func mockCheckAllocation(pf *mocks2.MockPartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile, +func mockCheckAllocation(pf *mocks.MockPartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile, out bool, err error) { pf.EXPECT().HasAllocated(file, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded()).Return(out, err).Times(1) } -func mockPfReader(pf *mocks2.MockPartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize, +func mockPfReader(pf *mocks.MockPartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize, outFile *os.File, err error) { pf.EXPECT().Reader(file, storiface.PaddedByteIndex(offset), size).Return(outFile, err) } diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 640fdf27687..b49ed284324 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -25,7 +25,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" - sealiface2 "github.com/filecoin-project/lotus/storage/pipeline/sealiface" + sealiface "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" ) @@ -63,10 +63,10 @@ type CommitBatcher struct { cutoffs map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]AggregateInput - waiting map[abi.SectorNumber][]chan sealiface2.CommitBatchRes + waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes notify, stop, stopped chan struct{} - force chan chan []sealiface2.CommitBatchRes + force chan chan []sealiface.CommitBatchRes lk sync.Mutex } @@ -82,10 +82,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat cutoffs: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]AggregateInput{}, - waiting: map[abi.SectorNumber][]chan sealiface2.CommitBatchRes{}, + waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan []sealiface2.CommitBatchRes), + force: make(chan chan []sealiface.CommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -96,8 +96,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat } func (b *CommitBatcher) run() { - var forceRes chan []sealiface2.CommitBatchRes - var lastMsg []sealiface2.CommitBatchRes + var forceRes chan []sealiface.CommitBatchRes + var lastMsg []sealiface.CommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -184,7 +184,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { return wait } -func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.CommitBatchRes, error) { +func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() @@ -202,7 +202,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.CommitBatchRes return nil, nil } - var res []sealiface2.CommitBatchRes + var res []sealiface.CommitBatchRes tok, h, err := b.api.ChainHead(b.mctx) if err != nil { @@ -264,7 +264,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.CommitBatchRes return res, nil } -func (b *CommitBatcher) processBatch(cfg sealiface2.Config) ([]sealiface2.CommitBatchRes, error) { +func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { tok, _, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err @@ -272,7 +272,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface2.Config) ([]sealiface2.Commit total := len(b.todo) - res := sealiface2.CommitBatchRes{ + res := sealiface.CommitBatchRes{ FailedSectors: map[abi.SectorNumber]string{}, } @@ -318,18 +318,18 @@ func (b *CommitBatcher) processBatch(cfg sealiface2.Config) ([]sealiface2.Commit mid, err := address.IDFromAddress(b.maddr) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } nv, err := b.api.StateNetworkVersion(b.mctx, tok) if err != nil { log.Errorf("getting network version: %s", err) - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) } arp, err := b.aggregateProofType(nv) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err) } params.AggregateProof, err = b.prover.AggregateSealProofs(proof.AggregateSealVerifyProofAndInfos{ @@ -339,30 +339,30 @@ func (b *CommitBatcher) processBatch(cfg sealiface2.Config) ([]sealiface2.Commit Infos: infos, }, proofs) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos)) bf, err := b.api.ChainBaseFee(b.mctx, tok) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err) } aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), bf) if err != nil { log.Errorf("getting aggregate commit network fee: %s", err) - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err) } aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen) @@ -370,29 +370,29 @@ func (b *CommitBatcher) processBatch(cfg sealiface2.Config) ([]sealiface2.Commit needFunds := big.Add(collateral, aggFee) needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds) if err != nil { - return []sealiface2.CommitBatchRes{res}, err + return []sealiface.CommitBatchRes{res}, err } goodFunds := big.Add(maxFee, needFunds) from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil { - return []sealiface2.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } res.Msg = &mcid log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) - return []sealiface2.CommitBatchRes{res}, nil + return []sealiface.CommitBatchRes{res}, nil } -func (b *CommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealiface2.CommitBatchRes, error) { +func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { return nil, xerrors.Errorf("couldn't get miner info: %w", err) @@ -417,10 +417,10 @@ func (b *CommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealiface2 return nil, err } - var res []sealiface2.CommitBatchRes + var res []sealiface.CommitBatchRes for sn, info := range b.todo { - r := sealiface2.CommitBatchRes{ + r := sealiface.CommitBatchRes{ Sectors: []abi.SectorNumber{sn}, FailedSectors: map[abi.SectorNumber]string{}, } @@ -439,7 +439,7 @@ func (b *CommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealiface2 return res, nil } -func (b *CommitBatcher) processSingle(cfg sealiface2.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) { +func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) { enc := new(bytes.Buffer) params := &miner.ProveCommitSectorParams{ SectorNumber: sn, @@ -484,19 +484,19 @@ func (b *CommitBatcher) processSingle(cfg sealiface2.Config, mi api.MinerInfo, a } // register commit, wait for batch message, return message CID -func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface2.CommitBatchRes, err error) { +func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) { sn := s.SectorNumber cu, err := b.getCommitCutoff(s) if err != nil { - return sealiface2.CommitBatchRes{}, err + return sealiface.CommitBatchRes{}, err } b.lk.Lock() b.cutoffs[sn] = cu b.todo[sn] = in - sent := make(chan sealiface2.CommitBatchRes, 1) + sent := make(chan sealiface.CommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -509,12 +509,12 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat case r := <-sent: return r, nil case <-ctx.Done(): - return sealiface2.CommitBatchRes{}, ctx.Err() + return sealiface.CommitBatchRes{}, ctx.Err() } } -func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface2.CommitBatchRes, error) { - resCh := make(chan []sealiface2.CommitBatchRes, 1) +func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { + resCh := make(chan []sealiface.CommitBatchRes, 1) select { case b.force <- resCh: select { diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index 65576d6711e..ce8309d6f11 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -22,9 +22,9 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/mocks" - sealiface2 "github.com/filecoin-project/lotus/storage/pipeline/sealiface" + sealiface "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" ) @@ -42,8 +42,8 @@ func TestCommitBatcher(t *testing.T) { maxBatch := miner5.MaxAggregatedSectors minBatch := miner5.MinAggregatedSectors - cfg := func() (sealiface2.Config, error) { - return sealiface2.Config{ + cfg := func() (sealiface.Config, error) { + return sealiface.Config{ MaxWaitDealsSectors: 2, MaxSealingSectors: 0, MaxSealingSectorsForDeals: 0, @@ -71,10 +71,10 @@ func TestCommitBatcher(t *testing.T) { } type promise func(t *testing.T) - type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise + type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise actions := func(as ...action) action { - return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { var ps []promise for _, a := range as { p := a(t, s, pcb) @@ -95,13 +95,13 @@ func TestCommitBatcher(t *testing.T) { } addSector := func(sn abi.SectorNumber) action { - return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise { - var pcres sealiface2.CommitBatchRes + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { + var pcres sealiface.CommitBatchRes var pcerr error done := sync.Mutex{} done.Lock() - si := sealing2.SectorInfo{ + si := pipeline.SectorInfo{ SectorNumber: sn, } @@ -113,7 +113,7 @@ func TestCommitBatcher(t *testing.T) { go func() { defer done.Unlock() - pcres, pcerr = pcb.AddCommit(ctx, si, sealing2.AggregateInput{ + pcres, pcerr = pcb.AddCommit(ctx, si, pipeline.AggregateInput{ Info: prooftypes.AggregateSealVerifyInfo{ Number: sn, }, @@ -138,7 +138,7 @@ func TestCommitBatcher(t *testing.T) { } waitPending := func(n int) action { - return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { require.Eventually(t, func() bool { p, err := pcb.Pending(ctx) require.NoError(t, err) @@ -151,7 +151,7 @@ func TestCommitBatcher(t *testing.T) { //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001, @CHAIN_STATE_MINER_GET_COLLATERAL_001 expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { - return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) ti := len(expect) @@ -217,7 +217,7 @@ func TestCommitBatcher(t *testing.T) { } flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { - return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing2.CommitBatcher) promise { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { _ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb) batch := len(expect) >= minBatch && aboveBalancer @@ -357,7 +357,7 @@ func TestCommitBatcher(t *testing.T) { // create them mocks pcapi := mocks.NewMockCommitBatcherApi(mockCtrl) - pcb := sealing2.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{}) + pcb := pipeline.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{}) var promises []promise diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 432f8d75d13..77bc90dd641 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -22,7 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" - sealiface2 "github.com/filecoin-project/lotus/storage/pipeline/sealiface" + sealiface "github.com/filecoin-project/lotus/storage/pipeline/sealiface" ) //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi @@ -51,10 +51,10 @@ type PreCommitBatcher struct { cutoffs map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]*preCommitEntry - waiting map[abi.SectorNumber][]chan sealiface2.PreCommitBatchRes + waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes notify, stop, stopped chan struct{} - force chan chan []sealiface2.PreCommitBatchRes + force chan chan []sealiface.PreCommitBatchRes lk sync.Mutex } @@ -69,10 +69,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom cutoffs: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]*preCommitEntry{}, - waiting: map[abi.SectorNumber][]chan sealiface2.PreCommitBatchRes{}, + waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan []sealiface2.PreCommitBatchRes), + force: make(chan chan []sealiface.PreCommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -83,8 +83,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom } func (b *PreCommitBatcher) run() { - var forceRes chan []sealiface2.PreCommitBatchRes - var lastRes []sealiface2.PreCommitBatchRes + var forceRes chan []sealiface.PreCommitBatchRes + var lastRes []sealiface.PreCommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -170,7 +170,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration return wait } -func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.PreCommitBatchRes, error) { +func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() @@ -210,7 +210,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.PreCommitBa } // todo support multiple batches - var res []sealiface2.PreCommitBatchRes + var res []sealiface.PreCommitBatchRes if !individual { res, err = b.processBatch(cfg, tok, bf, nv) } else { @@ -239,7 +239,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface2.PreCommitBa return res, nil } -func (b *PreCommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealiface2.PreCommitBatchRes, error) { +func (b *PreCommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) { mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { return nil, xerrors.Errorf("couldn't get miner info: %w", err) @@ -259,10 +259,10 @@ func (b *PreCommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealifa } } - var res []sealiface2.PreCommitBatchRes + var res []sealiface.PreCommitBatchRes for sn, info := range b.todo { - r := sealiface2.PreCommitBatchRes{ + r := sealiface.PreCommitBatchRes{ Sectors: []abi.SectorNumber{sn}, } @@ -279,7 +279,7 @@ func (b *PreCommitBatcher) processIndividually(cfg sealiface2.Config) ([]sealifa return res, nil } -func (b *PreCommitBatcher) processSingle(cfg sealiface2.Config, mi api.MinerInfo, avail *abi.TokenAmount, params *preCommitEntry) (cid.Cid, error) { +func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, params *preCommitEntry) (cid.Cid, error) { enc := new(bytes.Buffer) if err := params.pci.MarshalCBOR(enc); err != nil { @@ -315,10 +315,10 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface2.Config, mi api.MinerInfo return mcid, nil } -func (b *PreCommitBatcher) processBatch(cfg sealiface2.Config, tok TipSetToken, bf abi.TokenAmount, nv network.Version) ([]sealiface2.PreCommitBatchRes, error) { +func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tok TipSetToken, bf abi.TokenAmount, nv network.Version) ([]sealiface.PreCommitBatchRes, error) { params := miner.PreCommitSectorBatchParams{} deposit := big.Zero() - var res sealiface2.PreCommitBatchRes + var res sealiface.PreCommitBatchRes for _, p := range b.todo { if len(params.Sectors) >= cfg.MaxPreCommitBatch { @@ -333,12 +333,12 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface2.Config, tok TipSetToken, enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return []sealiface2.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { - return []sealiface2.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } maxFee := b.feeCfg.MaxPreCommitBatchGasFee.FeeForSectors(len(params.Sectors)) @@ -346,7 +346,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface2.Config, tok TipSetToken, aggFeeRaw, err := policy.AggregatePreCommitNetworkFee(nv, len(params.Sectors), bf) if err != nil { log.Errorf("getting aggregate precommit network fee: %s", err) - return []sealiface2.PreCommitBatchRes{res}, xerrors.Errorf("getting aggregate precommit network fee: %s", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("getting aggregate precommit network fee: %s", err) } aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen) @@ -354,39 +354,39 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface2.Config, tok TipSetToken, needFunds := big.Add(deposit, aggFee) needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds) if err != nil { - return []sealiface2.PreCommitBatchRes{res}, err + return []sealiface.PreCommitBatchRes{res}, err } goodFunds := big.Add(maxFee, needFunds) from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { - return []sealiface2.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes()) if err != nil { - return []sealiface2.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } res.Msg = &mcid log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo)) - return []sealiface2.PreCommitBatchRes{res}, nil + return []sealiface.PreCommitBatchRes{res}, nil } // register PreCommit, wait for batch message, return message CID -func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner.SectorPreCommitInfo) (res sealiface2.PreCommitBatchRes, err error) { +func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return sealiface2.PreCommitBatchRes{}, err + return sealiface.PreCommitBatchRes{}, err } cutoff, err := getPreCommitCutoff(curEpoch, s) if err != nil { - return sealiface2.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err) + return sealiface.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err) } sn := s.SectorNumber @@ -398,7 +398,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos pci: in, } - sent := make(chan sealiface2.PreCommitBatchRes, 1) + sent := make(chan sealiface.PreCommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -411,12 +411,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos case c := <-sent: return c, nil case <-ctx.Done(): - return sealiface2.PreCommitBatchRes{}, ctx.Err() + return sealiface.PreCommitBatchRes{}, ctx.Err() } } -func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface2.PreCommitBatchRes, error) { - resCh := make(chan []sealiface2.PreCommitBatchRes, 1) +func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { + resCh := make(chan []sealiface.PreCommitBatchRes, 1) select { case b.force <- resCh: select { diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index 38ce59eda96..54cbdedd38f 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -22,9 +22,9 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/mocks" - sealiface2 "github.com/filecoin-project/lotus/storage/pipeline/sealiface" + sealiface "github.com/filecoin-project/lotus/storage/pipeline/sealiface" ) var fc = config.MinerFeeConfig{ @@ -48,8 +48,8 @@ func TestPrecommitBatcher(t *testing.T) { maxBatch := miner6.PreCommitSectorBatchMaxSize - cfg := func() (sealiface2.Config, error) { - return sealiface2.Config{ + cfg := func() (sealiface.Config, error) { + return sealiface.Config{ MaxWaitDealsSectors: 2, MaxSealingSectors: 0, MaxSealingSectorsForDeals: 0, @@ -75,10 +75,10 @@ func TestPrecommitBatcher(t *testing.T) { } type promise func(t *testing.T) - type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise + type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise actions := func(as ...action) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { var ps []promise for _, a := range as { p := a(t, s, pcb) @@ -99,13 +99,13 @@ func TestPrecommitBatcher(t *testing.T) { } addSector := func(sn abi.SectorNumber) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { - var pcres sealiface2.PreCommitBatchRes + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { + var pcres sealiface.PreCommitBatchRes var pcerr error done := sync.Mutex{} done.Lock() - si := sealing2.SectorInfo{ + si := pipeline.SectorInfo{ SectorNumber: sn, } @@ -139,7 +139,7 @@ func TestPrecommitBatcher(t *testing.T) { } waitPending := func(n int) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { require.Eventually(t, func() bool { p, err := pcb.Pending(ctx) require.NoError(t, err) @@ -152,7 +152,7 @@ func TestPrecommitBatcher(t *testing.T) { //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001 expectSend := func(expect []abi.SectorNumber) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(10001), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) @@ -173,7 +173,7 @@ func TestPrecommitBatcher(t *testing.T) { //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001 expectSendsSingle := func(expect []abi.SectorNumber) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(9999), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) @@ -194,7 +194,7 @@ func TestPrecommitBatcher(t *testing.T) { } flush := func(expect []abi.SectorNumber) action { - return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing2.PreCommitBatcher) promise { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { _ = expectSend(expect)(t, s, pcb) r, err := pcb.Flush(ctx) @@ -261,7 +261,7 @@ func TestPrecommitBatcher(t *testing.T) { // create them mocks pcapi := mocks.NewMockPreCommitBatcherApi(mockCtrl) - pcb := sealing2.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg) + pcb := pipeline.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg) var promises []promise diff --git a/storage/pipeline/precommit_policy_test.go b/storage/pipeline/precommit_policy_test.go index 0886f0d98af..a823ef81ad0 100644 --- a/storage/pipeline/precommit_policy_test.go +++ b/storage/pipeline/precommit_policy_test.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/policy" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" ) @@ -29,7 +29,7 @@ type fakeConfigStub struct { CCSectorLifetime time.Duration } -func fakeConfigGetter(stub *fakeConfigStub) sealing2.GetSealingConfigFunc { +func fakeConfigGetter(stub *fakeConfigStub) pipeline.GetSealingConfigFunc { return func() (sealiface.Config, error) { if stub == nil { return sealiface.Config{}, nil @@ -41,11 +41,11 @@ func fakeConfigGetter(stub *fakeConfigStub) sealing2.GetSealingConfigFunc { } } -func (f *fakeChain) StateNetworkVersion(ctx context.Context, tok sealing2.TipSetToken) (network.Version, error) { +func (f *fakeChain) StateNetworkVersion(ctx context.Context, tok pipeline.TipSetToken) (network.Version, error) { return build.NewestNetworkVersion, nil } -func (f *fakeChain) ChainHead(ctx context.Context) (sealing2.TipSetToken, abi.ChainEpoch, error) { +func (f *fakeChain) ChainHead(ctx context.Context) (pipeline.TipSetToken, abi.ChainEpoch, error) { return []byte{1, 2, 3}, f.h, nil } @@ -60,7 +60,7 @@ func TestBasicPolicyEmptySector(t *testing.T) { cfg := fakeConfigGetter(nil) h := abi.ChainEpoch(55) pBuffer := abi.ChainEpoch(2) - pcp := sealing2.NewBasicPreCommitPolicy(&fakeChain{h: h}, cfg, pBuffer) + pcp := pipeline.NewBasicPreCommitPolicy(&fakeChain{h: h}, cfg, pBuffer) exp, err := pcp.Expiration(context.Background()) require.NoError(t, err) @@ -77,7 +77,7 @@ func TestCustomCCSectorConfig(t *testing.T) { cfg := fakeConfigGetter(&cfgStub) h := abi.ChainEpoch(55) pBuffer := abi.ChainEpoch(2) - pcp := sealing2.NewBasicPreCommitPolicy(&fakeChain{h: h}, cfg, pBuffer) + pcp := pipeline.NewBasicPreCommitPolicy(&fakeChain{h: h}, cfg, pBuffer) exp, err := pcp.Expiration(context.Background()) require.NoError(t, err) @@ -89,11 +89,11 @@ func TestCustomCCSectorConfig(t *testing.T) { func TestBasicPolicyMostConstrictiveSchedule(t *testing.T) { cfg := fakeConfigGetter(nil) - policy := sealing2.NewBasicPreCommitPolicy(&fakeChain{ + policy := pipeline.NewBasicPreCommitPolicy(&fakeChain{ h: abi.ChainEpoch(55), }, cfg, 2) longestDealEpochEnd := abi.ChainEpoch(547300) - pieces := []sealing2.Piece{ + pieces := []pipeline.Piece{ { Piece: abi.PieceInfo{ Size: abi.PaddedPieceSize(1024), @@ -130,11 +130,11 @@ func TestBasicPolicyMostConstrictiveSchedule(t *testing.T) { func TestBasicPolicyIgnoresExistingScheduleIfExpired(t *testing.T) { cfg := fakeConfigGetter(nil) - policy := sealing2.NewBasicPreCommitPolicy(&fakeChain{ + policy := pipeline.NewBasicPreCommitPolicy(&fakeChain{ h: abi.ChainEpoch(55), }, cfg, 0) - pieces := []sealing2.Piece{ + pieces := []pipeline.Piece{ { Piece: abi.PieceInfo{ Size: abi.PaddedPieceSize(1024), @@ -159,11 +159,11 @@ func TestBasicPolicyIgnoresExistingScheduleIfExpired(t *testing.T) { func TestMissingDealIsIgnored(t *testing.T) { cfg := fakeConfigGetter(nil) - policy := sealing2.NewBasicPreCommitPolicy(&fakeChain{ + policy := pipeline.NewBasicPreCommitPolicy(&fakeChain{ h: abi.ChainEpoch(55), }, cfg, 0) - pieces := []sealing2.Piece{ + pieces := []pipeline.Piece{ { Piece: abi.PieceInfo{ Size: abi.PaddedPieceSize(1024), diff --git a/storage/pipeline/states_failed_test.go b/storage/pipeline/states_failed_test.go index 7c1b3553f5d..f10a08cb3bd 100644 --- a/storage/pipeline/states_failed_test.go +++ b/storage/pipeline/states_failed_test.go @@ -20,7 +20,7 @@ import ( api2 "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" - sealing2 "github.com/filecoin-project/lotus/storage/pipeline" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" mocks "github.com/filecoin-project/lotus/storage/pipeline/mocks" ) @@ -33,9 +33,9 @@ func TestStateRecoverDealIDs(t *testing.T) { api := mocks.NewMockSealingAPI(mockCtrl) - fakeSealing := &sealing2.Sealing{ + fakeSealing := &pipeline.Sealing{ Api: api, - DealInfo: &sealing2.CurrentDealInfoManager{CDAPI: api}, + DealInfo: &pipeline.CurrentDealInfoManager{CDAPI: api}, } sctx := mocks.NewMockContext(mockCtrl) @@ -55,8 +55,8 @@ func TestStateRecoverDealIDs(t *testing.T) { // expect GetCurrentDealInfo { - api.EXPECT().StateSearchMsg(ctx, pc).Return(&sealing2.MsgLookup{ - Receipt: sealing2.MessageReceipt{ + api.EXPECT().StateSearchMsg(ctx, pc).Return(&pipeline.MsgLookup{ + Receipt: pipeline.MessageReceipt{ ExitCode: exitcode.Ok, Return: cborRet(&market0.PublishStorageDealsReturn{ IDs: []abi.DealID{dealId}, @@ -70,12 +70,12 @@ func TestStateRecoverDealIDs(t *testing.T) { } - sctx.EXPECT().Send(sealing2.SectorRemove{}).Return(nil) + sctx.EXPECT().Send(pipeline.SectorRemove{}).Return(nil) // TODO sctx should satisfy an interface so it can be useable for mocking. This will fail because we are passing in an empty context now to get this to build. // https://github.com/filecoin-project/lotus/issues/7867 - err := fakeSealing.HandleRecoverDealIDs(statemachine.Context{}, sealing2.SectorInfo{ - Pieces: []sealing2.Piece{ + err := fakeSealing.HandleRecoverDealIDs(statemachine.Context{}, pipeline.SectorInfo{ + Pieces: []pipeline.Piece{ { DealInfo: &api2.PieceDealInfo{ DealID: dealId,