Skip to content

Commit

Permalink
sealing pipeline: Use non-special msg wait APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jun 16, 2022
1 parent cd4514b commit 048bfe6
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 157 deletions.
38 changes: 4 additions & 34 deletions storage/adapter_storage_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
Expand Down Expand Up @@ -103,42 +102,13 @@ func (s SealingAPIAdapter) ChainReadObj(ctx context.Context, ocid cid.Cid) ([]by
return s.delegate.ChainReadObj(ctx, ocid)
}

func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (pipeline.MsgLookup, error) {
wmsg, err := s.delegate.StateWaitMsg(ctx, mcid, build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return pipeline.MsgLookup{}, err
}
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
return s.delegate.StateWaitMsg(ctx, cid, confidence, limit, allowReplaced)

return pipeline.MsgLookup{
Receipt: pipeline.MessageReceipt{
ExitCode: wmsg.Receipt.ExitCode,
Return: wmsg.Receipt.Return,
GasUsed: wmsg.Receipt.GasUsed,
},
TipSetTok: wmsg.TipSet,
Height: wmsg.Height,
}, nil
}

func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*pipeline.MsgLookup, error) {
wmsg, err := s.delegate.StateSearchMsg(ctx, types.EmptyTSK, c, api.LookbackNoLimit, true)
if err != nil {
return nil, err
}

if wmsg == nil {
return nil, nil
}

return &pipeline.MsgLookup{
Receipt: pipeline.MessageReceipt{
ExitCode: wmsg.Receipt.ExitCode,
Return: wmsg.Receipt.Return,
GasUsed: wmsg.Receipt.GasUsed,
},
TipSetTok: wmsg.TipSet,
Height: wmsg.Height,
}, nil
func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced)
}

func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) {
Expand Down
58 changes: 5 additions & 53 deletions storage/pipeline/currentdealinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand All @@ -23,7 +22,7 @@ type CurrentDealInfoAPI interface {
ChainGetMessage(context.Context, cid.Cid) (*types.Message, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error)
}

Expand Down Expand Up @@ -69,7 +68,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
dealID := abi.DealID(0)

// Get the return value of the publish deals message
lookup, err := mgr.CDAPI.StateSearchMsg(ctx, publishCid)
lookup, err := mgr.CDAPI.StateSearchMsg(ctx, tok, publishCid, api.LookbackNoLimit, true)
if err != nil {
return dealID, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
}
Expand All @@ -82,7 +81,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
return dealID, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
}

nv, err := mgr.CDAPI.StateNetworkVersion(ctx, lookup.TipSetTok)
nv, err := mgr.CDAPI.StateNetworkVersion(ctx, lookup.TipSet)
if err != nil {
return dealID, types.EmptyTSK, xerrors.Errorf("getting network version: %w", err)
}
Expand Down Expand Up @@ -113,7 +112,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
// There is a single deal in this publish message and no deal proposal
// was supplied, so we have nothing to compare against. Just assume
// the deal ID is correct and that it was valid
return dealIDs[0], lookup.TipSetTok, nil
return dealIDs[0], lookup.TipSet, nil
}

// Get the parameters to the publish deals message
Expand Down Expand Up @@ -166,7 +165,7 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
if outIdx >= len(dealIDs) {
return dealID, types.EmptyTSK, xerrors.Errorf("invalid publish storage deals ret marking %d as valid while only returning %d valid deals in publish deal message %s", outIdx, len(dealIDs), publishCid)
}
return dealIDs[outIdx], lookup.TipSetTok, nil
return dealIDs[outIdx], lookup.TipSet, nil
}

func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok types.TipSetKey, p1, p2 market.DealProposal) (bool, error) {
Expand All @@ -190,50 +189,3 @@ func (mgr *CurrentDealInfoManager) CheckDealEquality(ctx context.Context, tok ty
p1.Provider == p2.Provider &&
p1ClientID == p2ClientID, nil
}

type CurrentDealInfoTskAPI interface {
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
}

type CurrentDealInfoAPIAdapter struct {
CurrentDealInfoTskAPI
}

func (c *CurrentDealInfoAPIAdapter) StateLookupID(ctx context.Context, a address.Address, tsk types.TipSetKey) (address.Address, error) {
return c.CurrentDealInfoTskAPI.StateLookupID(ctx, a, tsk)
}

func (c *CurrentDealInfoAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) {
return c.CurrentDealInfoTskAPI.StateMarketStorageDeal(ctx, dealID, tsk)
}

func (c *CurrentDealInfoAPIAdapter) StateSearchMsg(ctx context.Context, k cid.Cid) (*MsgLookup, error) {
wmsg, err := c.CurrentDealInfoTskAPI.StateSearchMsg(ctx, types.EmptyTSK, k, api.LookbackNoLimit, true)
if err != nil {
return nil, err
}

if wmsg == nil {
return nil, nil
}

return &MsgLookup{
Receipt: MessageReceipt{
ExitCode: wmsg.Receipt.ExitCode,
Return: wmsg.Receipt.Return,
GasUsed: wmsg.Receipt.GasUsed,
},
TipSetTok: wmsg.TipSet,
Height: wmsg.Height,
}, nil
}

func (c *CurrentDealInfoAPIAdapter) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) {
return c.CurrentDealInfoTskAPI.StateNetworkVersion(ctx, tsk)
}

var _ CurrentDealInfoAPI = (*CurrentDealInfoAPIAdapter)(nil)
46 changes: 23 additions & 23 deletions storage/pipeline/currentdealinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestGetCurrentDealInfo(t *testing.T) {
}

type testCaseData struct {
searchMessageLookup *MsgLookup
searchMessageLookup *api.MsgLookup
searchMessageErr error
marketDeals map[abi.DealID]*api.MarketDeal
publishCid cid.Cid
Expand All @@ -114,8 +114,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
testCases := map[string]testCaseData{
"deal lookup succeeds": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{successDealID}),
},
Expand All @@ -129,8 +129,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"deal lookup succeeds two return values": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID, successDealID}),
},
Expand All @@ -145,8 +145,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"deal lookup fails proposal mis-match": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID}),
},
Expand All @@ -160,8 +160,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"deal lookup handles invalid actor output with mismatched count of deals and return values": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID}),
},
Expand All @@ -177,8 +177,8 @@ func TestGetCurrentDealInfo(t *testing.T) {

"deal lookup fails when deal was not valid and index exceeds output array": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturn(t, []abi.DealID{earlierDealID}, []uint64{0}),
},
Expand All @@ -195,8 +195,8 @@ func TestGetCurrentDealInfo(t *testing.T) {

"deal lookup succeeds when theres a separate deal failure": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturn(t, []abi.DealID{anotherDealID, successDealID}, []uint64{0, 2}),
},
Expand All @@ -214,8 +214,8 @@ func TestGetCurrentDealInfo(t *testing.T) {

"deal lookup succeeds, target proposal nil, single deal in message": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{successDealID}),
},
Expand All @@ -229,8 +229,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"deal lookup fails, multiple deals in return value but target proposal nil": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: makePublishDealsReturnBytesOldVersion(t, []abi.DealID{earlierDealID, successDealID}),
},
Expand Down Expand Up @@ -258,8 +258,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"return code not ok": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.ErrIllegalState,
},
},
Expand All @@ -269,8 +269,8 @@ func TestGetCurrentDealInfo(t *testing.T) {
},
"unable to unmarshal params": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Receipt: MessageReceipt{
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: []byte("applesauce"),
},
Expand Down Expand Up @@ -319,7 +319,7 @@ type marketDealKey struct {
}

type CurrentDealInfoMockAPI struct {
SearchMessageLookup *MsgLookup
SearchMessageLookup *api.MsgLookup
SearchMessageErr error

MarketDeals map[marketDealKey]*api.MarketDeal
Expand Down Expand Up @@ -371,7 +371,7 @@ func (mapi *CurrentDealInfoMockAPI) StateMarketStorageDeal(ctx context.Context,
return deal, nil
}

func (mapi *CurrentDealInfoMockAPI) StateSearchMsg(ctx context.Context, c cid.Cid) (*MsgLookup, error) {
func (mapi *CurrentDealInfoMockAPI) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
if mapi.SearchMessageLookup == nil {
return mapi.SearchMessageLookup, mapi.SearchMessageErr
}
Expand Down
20 changes: 10 additions & 10 deletions storage/pipeline/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions storage/pipeline/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit in
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . SealingAPI

type SealingAPI interface {
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error)

// Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated
Expand Down
7 changes: 4 additions & 3 deletions storage/pipeline/states_failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sealing

import (
"context"
"github.com/filecoin-project/lotus/api"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
}

if sector.PreCommitMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage)
mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.PreCommitMessage, api.LookbackNoLimit, true)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
Expand Down Expand Up @@ -189,7 +190,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
}

if sector.ReplicaUpdateMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.ReplicaUpdateMessage)
mw, err := m.Api.StateSearchMsg(ctx.Context(), types.EmptyTSK, *sector.ReplicaUpdateMessage, api.LookbackNoLimit, true)
if err != nil {
// API error
return ctx.Send(SectorRetrySubmitReplicaUpdateWait{})
Expand Down Expand Up @@ -269,7 +270,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
}

if sector.CommitMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.CommitMessage, api.LookbackNoLimit, true)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
Expand Down
Loading

0 comments on commit 048bfe6

Please sign in to comment.