From 048bfe6d5b8fb101129aa047407a8ca749d1c046 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 16 Jun 2022 12:47:19 +0200 Subject: [PATCH] sealing pipeline: Use non-special msg wait APIs --- storage/adapter_storage_miner.go | 38 ++------------- storage/pipeline/currentdealinfo.go | 58 ++--------------------- storage/pipeline/currentdealinfo_test.go | 46 +++++++++--------- storage/pipeline/mocks/api.go | 20 ++++---- storage/pipeline/sealing.go | 4 +- storage/pipeline/states_failed.go | 7 +-- storage/pipeline/states_failed_test.go | 5 +- storage/pipeline/states_proving.go | 5 +- storage/pipeline/states_replica_update.go | 7 +-- storage/pipeline/states_sealing.go | 11 +++-- storage/pipeline/types.go | 20 -------- 11 files changed, 64 insertions(+), 157 deletions(-) diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 11c5065a3c3..abf35f78eb9 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -20,7 +20,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -103,42 +102,13 @@ func (s SealingAPIAdapter) ChainReadObj(ctx context.Context, ocid cid.Cid) ([]by return s.delegate.ChainReadObj(ctx, ocid) } -func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (pipeline.MsgLookup, error) { - wmsg, err := s.delegate.StateWaitMsg(ctx, mcid, build.MessageConfidence, api.LookbackNoLimit, true) - if err != nil { - return pipeline.MsgLookup{}, err - } +func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { + return s.delegate.StateWaitMsg(ctx, cid, confidence, limit, allowReplaced) - return pipeline.MsgLookup{ - Receipt: pipeline.MessageReceipt{ - ExitCode: wmsg.Receipt.ExitCode, - Return: wmsg.Receipt.Return, - GasUsed: wmsg.Receipt.GasUsed, - }, - TipSetTok: wmsg.TipSet, - Height: wmsg.Height, - }, nil } -func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*pipeline.MsgLookup, error) { - wmsg, err := s.delegate.StateSearchMsg(ctx, types.EmptyTSK, c, api.LookbackNoLimit, true) - if err != nil { - return nil, err - } - - if wmsg == nil { - return nil, nil - } - - return &pipeline.MsgLookup{ - Receipt: pipeline.MessageReceipt{ - ExitCode: wmsg.Receipt.ExitCode, - Return: wmsg.Receipt.Return, - GasUsed: wmsg.Receipt.GasUsed, - }, - TipSetTok: wmsg.TipSet, - Height: wmsg.Height, - }, nil +func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { + return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced) } func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) { diff --git a/storage/pipeline/currentdealinfo.go b/storage/pipeline/currentdealinfo.go index 1e7f0c62b8a..99086c7533b 100644 --- a/storage/pipeline/currentdealinfo.go +++ b/storage/pipeline/currentdealinfo.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -23,7 +22,7 @@ type CurrentDealInfoAPI interface { ChainGetMessage(context.Context, cid.Cid) (*types.Message, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) - StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) + StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error) } @@ -69,7 +68,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context dealID := abi.DealID(0) // Get the return value of the publish deals message - lookup, err := mgr.CDAPI.StateSearchMsg(ctx, publishCid) + lookup, err := mgr.CDAPI.StateSearchMsg(ctx, tok, publishCid, api.LookbackNoLimit, true) if err != nil { return dealID, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err) } @@ -82,7 +81,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context return dealID, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode) } - nv, err := mgr.CDAPI.StateNetworkVersion(ctx, lookup.TipSetTok) + nv, err := mgr.CDAPI.StateNetworkVersion(ctx, lookup.TipSet) if err != nil { return dealID, types.EmptyTSK, xerrors.Errorf("getting network version: %w", err) } @@ -113,7 +112,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context // There is a single deal in this publish message and no deal proposal // was supplied, so we have nothing to compare against. Just assume // the deal ID is correct and that it was valid - return dealIDs[0], lookup.TipSetTok, nil + return dealIDs[0], lookup.TipSet, nil } // Get the parameters to the publish deals message @@ -166,7 +165,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context if outIdx >= len(dealIDs) { return dealID, types.EmptyTSK, xerrors.Errorf("invalid publish storage deals ret marking %d as valid while only returning %d valid deals in publish deal message %s", outIdx, len(dealIDs), publishCid) } - return dealIDs[outIdx], lookup.TipSetTok, nil + return dealIDs[outIdx], lookup.TipSet, nil } func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok types.TipSetKey, p1, p2 market.DealProposal) (bool, error) { @@ -190,50 +189,3 @@ func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok ty p1.Provider == p2.Provider && p1ClientID == p2ClientID, nil } - -type CurrentDealInfoTskAPI interface { - ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) - StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) - StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) - StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) - StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) -} - -type CurrentDealInfoAPIAdapter struct { - CurrentDealInfoTskAPI -} - -func (c *CurrentDealInfoAPIAdapter) StateLookupID(ctx context.Context, a address.Address, tsk types.TipSetKey) (address.Address, error) { - return c.CurrentDealInfoTskAPI.StateLookupID(ctx, a, tsk) -} - -func (c *CurrentDealInfoAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) { - return c.CurrentDealInfoTskAPI.StateMarketStorageDeal(ctx, dealID, tsk) -} - -func (c *CurrentDealInfoAPIAdapter) StateSearchMsg(ctx context.Context, k cid.Cid) (*MsgLookup, error) { - wmsg, err := c.CurrentDealInfoTskAPI.StateSearchMsg(ctx, types.EmptyTSK, k, api.LookbackNoLimit, true) - if err != nil { - return nil, err - } - - if wmsg == nil { - return nil, nil - } - - return &MsgLookup{ - Receipt: MessageReceipt{ - ExitCode: wmsg.Receipt.ExitCode, - Return: wmsg.Receipt.Return, - GasUsed: wmsg.Receipt.GasUsed, - }, - TipSetTok: wmsg.TipSet, - Height: wmsg.Height, - }, nil -} - -func (c *CurrentDealInfoAPIAdapter) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) { - return c.CurrentDealInfoTskAPI.StateNetworkVersion(ctx, tsk) -} - -var _ CurrentDealInfoAPI = (*CurrentDealInfoAPIAdapter)(nil) diff --git a/storage/pipeline/currentdealinfo_test.go b/storage/pipeline/currentdealinfo_test.go index 990fce00e1a..c5653959d54 100644 --- a/storage/pipeline/currentdealinfo_test.go +++ b/storage/pipeline/currentdealinfo_test.go @@ -101,7 +101,7 @@ func TestGetCurrentDealInfo(t *testing.T) { } type testCaseData struct { - searchMessageLookup *MsgLookup + searchMessageLookup *api.MsgLookup searchMessageErr error marketDeals map[abi.DealID]*api.MarketDeal publishCid cid.Cid @@ -114,8 +114,8 @@ func TestGetCurrentDealInfo(t *testing.T) { testCases := map[string]testCaseData{ "deal lookup succeeds": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{successDealID}), }, @@ -129,8 +129,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "deal lookup succeeds two return values": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID, successDealID}), }, @@ -145,8 +145,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "deal lookup fails proposal mis-match": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID}), }, @@ -160,8 +160,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "deal lookup handles invalid actor output with mismatched count of deals and return values": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID}), }, @@ -177,8 +177,8 @@ func TestGetCurrentDealInfo(t *testing.T) { "deal lookup fails when deal was not valid and index exceeds output array": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturn(t, []abi.DealID{earlierDealID}, []uint64{0}), }, @@ -195,8 +195,8 @@ func TestGetCurrentDealInfo(t *testing.T) { "deal lookup succeeds when theres a separate deal failure": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturn(t, []abi.DealID{anotherDealID, successDealID}, []uint64{0, 2}), }, @@ -214,8 +214,8 @@ func TestGetCurrentDealInfo(t *testing.T) { "deal lookup succeeds, target proposal nil, single deal in message": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{successDealID}), }, @@ -229,8 +229,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "deal lookup fails, multiple deals in return value but target proposal nil": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID, successDealID}), }, @@ -258,8 +258,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "return code not ok": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.ErrIllegalState, }, }, @@ -269,8 +269,8 @@ func TestGetCurrentDealInfo(t *testing.T) { }, "unable to unmarshal params": { publishCid: dummyCid, - searchMessageLookup: &MsgLookup{ - Receipt: MessageReceipt{ + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: []byte("applesauce"), }, @@ -319,7 +319,7 @@ type marketDealKey struct { } type CurrentDealInfoMockAPI struct { - SearchMessageLookup *MsgLookup + SearchMessageLookup *api.MsgLookup SearchMessageErr error MarketDeals map[marketDealKey]*api.MarketDeal @@ -371,7 +371,7 @@ func (mapi *CurrentDealInfoMockAPI) StateMarketStorageDeal(ctx context.Context, return deal, nil } -func (mapi *CurrentDealInfoMockAPI) StateSearchMsg(ctx context.Context, c cid.Cid) (*MsgLookup, error) { +func (mapi *CurrentDealInfoMockAPI) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { if mapi.SearchMessageLookup == nil { return mapi.SearchMessageLookup, mapi.SearchMessageErr } diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index a4b88a9a9d9..c7be5c0ae53 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -348,18 +348,18 @@ func (mr *MockSealingAPIMockRecorder) StateNetworkVersion(arg0, arg1 interface{} } // StateSearchMsg mocks base method. -func (m *MockSealingAPI) StateSearchMsg(arg0 context.Context, arg1 cid.Cid) (*sealing.MsgLookup, error) { +func (m *MockSealingAPI) StateSearchMsg(arg0 context.Context, arg1 types.TipSetKey, arg2 cid.Cid, arg3 abi.ChainEpoch, arg4 bool) (*api.MsgLookup, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StateSearchMsg", arg0, arg1) - ret0, _ := ret[0].(*sealing.MsgLookup) + ret := m.ctrl.Call(m, "StateSearchMsg", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(*api.MsgLookup) ret1, _ := ret[1].(error) return ret0, ret1 } // StateSearchMsg indicates an expected call of StateSearchMsg. -func (mr *MockSealingAPIMockRecorder) StateSearchMsg(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockSealingAPIMockRecorder) StateSearchMsg(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSearchMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateSearchMsg), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSearchMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateSearchMsg), arg0, arg1, arg2, arg3, arg4) } // StateSectorGetInfo mocks base method. @@ -408,16 +408,16 @@ func (mr *MockSealingAPIMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, } // StateWaitMsg mocks base method. -func (m *MockSealingAPI) StateWaitMsg(arg0 context.Context, arg1 cid.Cid) (sealing.MsgLookup, error) { +func (m *MockSealingAPI) StateWaitMsg(arg0 context.Context, arg1 cid.Cid, arg2 uint64, arg3 abi.ChainEpoch, arg4 bool) (*api.MsgLookup, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StateWaitMsg", arg0, arg1) - ret0, _ := ret[0].(sealing.MsgLookup) + ret := m.ctrl.Call(m, "StateWaitMsg", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(*api.MsgLookup) ret1, _ := ret[1].(error) return ret0, ret1 } // StateWaitMsg indicates an expected call of StateWaitMsg. -func (mr *MockSealingAPIMockRecorder) StateWaitMsg(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockSealingAPIMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4) } diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index a3453439318..1f7b0d53da8 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -48,8 +48,8 @@ var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit in //go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . SealingAPI type SealingAPI interface { - StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) - StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) + StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) + StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error) // Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated diff --git a/storage/pipeline/states_failed.go b/storage/pipeline/states_failed.go index 15877c59e1b..552fcee6850 100644 --- a/storage/pipeline/states_failed.go +++ b/storage/pipeline/states_failed.go @@ -2,6 +2,7 @@ package sealing import ( "context" + "github.com/filecoin-project/lotus/api" "time" "github.com/hashicorp/go-multierror" @@ -80,7 +81,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI } if sector.PreCommitMessage != nil { - mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage) + mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.PreCommitMessage, api.LookbackNoLimit, true) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { @@ -189,7 +190,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect } if sector.ReplicaUpdateMessage != nil { - mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.ReplicaUpdateMessage) + mw, err := m.Api.StateSearchMsg(ctx.Context(), types.EmptyTSK, *sector.ReplicaUpdateMessage, api.LookbackNoLimit, true) if err != nil { // API error return ctx.Send(SectorRetrySubmitReplicaUpdateWait{}) @@ -269,7 +270,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo } if sector.CommitMessage != nil { - mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.CommitMessage, api.LookbackNoLimit, true) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { diff --git a/storage/pipeline/states_failed_test.go b/storage/pipeline/states_failed_test.go index 78f55f1e77f..02846f93f97 100644 --- a/storage/pipeline/states_failed_test.go +++ b/storage/pipeline/states_failed_test.go @@ -4,6 +4,7 @@ package sealing_test import ( "bytes" "context" + "github.com/filecoin-project/lotus/chain/types" "testing" "github.com/golang/mock/gomock" @@ -55,8 +56,8 @@ func TestStateRecoverDealIDs(t *testing.T) { // expect GetCurrentDealInfo { - api.EXPECT().StateSearchMsg(ctx, pc).Return(&pipeline.MsgLookup{ - Receipt: pipeline.MessageReceipt{ + api.EXPECT().StateSearchMsg(ctx, gomock.Any(), pc, gomock.Any(), gomock.Any()).Return(&api2.MsgLookup{ + Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, Return: cborRet(&market0.PublishStorageDealsReturn{ IDs: []abi.DealID{dealId}, diff --git a/storage/pipeline/states_proving.go b/storage/pipeline/states_proving.go index 9483d1ed02a..7110193faa5 100644 --- a/storage/pipeline/states_proving.go +++ b/storage/pipeline/states_proving.go @@ -1,6 +1,7 @@ package sealing import ( + "github.com/filecoin-project/lotus/api" "time" "golang.org/x/xerrors" @@ -25,7 +26,7 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid") } - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { return xerrors.Errorf("failed to wait for fault declaration: %w", err) } @@ -83,7 +84,7 @@ func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInf return xerrors.New("entered TerminateWait with nil TerminateMessage") } - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)}) } diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 79b3f0185f5..89e96ffe2cf 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "github.com/filecoin-project/lotus/build" "time" "golang.org/x/xerrors" @@ -187,7 +188,7 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { log.Errorf("handleReplicaUpdateWait: failed to wait for message: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) @@ -204,7 +205,7 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto default: return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) + si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSet) if err != nil { log.Errorf("api err failed to get sector info: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) @@ -235,7 +236,7 @@ func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector S func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector SectorInfo) error { try := func() error { - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { return err } diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index a93fa455073..e65b50322d6 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "github.com/filecoin-project/lotus/build" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -456,7 +457,7 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf // would be ideal to just use the events.Called handler, but it wouldn't be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorNumber) - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { return ctx.Send(SectorChainPreCommitFailed{err}) } @@ -477,7 +478,7 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf log.Info("precommit message landed on chain: ", sector.SectorNumber) - return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSetTok}) + return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSet}) } func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { @@ -537,7 +538,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) if sector.CommitMessage != nil { log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber) - ml, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + ml, err := m.Api.StateSearchMsg(ctx.Context(), types.EmptyTSK, *sector.CommitMessage, api.LookbackNoLimit, true) if err != nil { log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err) } @@ -730,7 +731,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")}) } - mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.CommitMessage) + mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.CommitMessage, build.MessageConfidence, api.LookbackNoLimit, true) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) } @@ -747,7 +748,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } - si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) + si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSet) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)}) } diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index bdca50c393a..aa0ee8c3c3c 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -1,7 +1,6 @@ package sealing import ( - "bytes" "context" "github.com/ipfs/go-cid" @@ -9,7 +8,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin/v8/miner" - "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" @@ -197,22 +195,4 @@ type SectorIDCounter interface { Next() (abi.SectorNumber, error) } -type CborTipSetToken []byte - -type MsgLookup struct { - Receipt MessageReceipt - TipSetTok types.TipSetKey - Height abi.ChainEpoch -} - -type MessageReceipt struct { - ExitCode exitcode.ExitCode - Return []byte - GasUsed int64 -} - type GetSealingConfigFunc func() (sealiface.Config, error) - -func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { - return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed -}