Skip to content

Commit

Permalink
sealing pipeline: Drop TipSetToken, use TipSetKey directly
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jun 16, 2022
1 parent b706efc commit 9aa5659
Show file tree
Hide file tree
Showing 32 changed files with 293 additions and 360 deletions.
45 changes: 45 additions & 0 deletions chain/types/tipset_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package types
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"

"github.com/ipfs/go-cid"
typegen "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/go-state-types/abi"
)
Expand Down Expand Up @@ -95,6 +98,45 @@ func (k *TipSetKey) UnmarshalJSON(b []byte) error {
return nil
}

func (k TipSetKey) MarshalCBOR(writer io.Writer) error {
if err := typegen.WriteMajorTypeHeader(writer, typegen.MajByteString, uint64(len(k.Bytes()))); err != nil {
return err
}

_, err := writer.Write(k.Bytes())
return err
}

func (k *TipSetKey) UnmarshalCBOR(reader io.Reader) error {
cr := typegen.NewCborReader(reader)

maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()

if extra > typegen.ByteArrayMaxLen {
return fmt.Errorf("t.Binary: byte array too large (%d)", extra)
}
if maj != typegen.MajByteString {
return fmt.Errorf("expected byte array")
}

b := make([]uint8, extra)

if _, err := io.ReadFull(cr, b); err != nil {
return err
}

*k, err = TipSetKeyFromBytes(b)
return err
}

func (k TipSetKey) IsEmpty() bool {
return len(k.value) == 0
}
Expand Down Expand Up @@ -124,3 +166,6 @@ func decodeKey(encoded []byte) ([]cid.Cid, error) {
}
return cids, nil
}

var _ typegen.CBORMarshaler = &TipSetKey{}
var _ typegen.CBORUnmarshaler = &TipSetKey{}
10 changes: 10 additions & 0 deletions chain/types/tipset_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package types

import (
"encoding/hex"
"encoding/json"
"fmt"
"testing"
Expand All @@ -10,6 +11,8 @@ import (
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

cborrpc "github.com/filecoin-project/go-cbor-util"
)

func TestTipSetKey(t *testing.T) {
Expand Down Expand Up @@ -71,6 +74,13 @@ func TestTipSetKey(t *testing.T) {
`{"/":"bafy2bzacedwviarjtjraqakob5pslltmuo5n3xev3nt5zylezofkbbv5jclyu"}`+
`]`, k3)
})

t.Run("CBOR", func(t *testing.T) {
k3 := NewTipSetKey(c1, c2, c3)
b, err := cborrpc.Dump(k3)
require.NoError(t, err)
fmt.Println(hex.EncodeToString(b))
})
}

func verifyJSON(t *testing.T, expected string, k TipSetKey) {
Expand Down
15 changes: 5 additions & 10 deletions markets/storageadapter/ondealsectorcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type eventsCalledAPI interface {
}

type dealInfoAPI interface {
GetCurrentDealInfo(ctx context.Context, tok pipeline.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (pipeline.CurrentDealInfo, error)
GetCurrentDealInfo(ctx context.Context, tok types.TipSetKey, proposal *market.DealProposal, publishCid cid.Cid) (pipeline.CurrentDealInfo, error)
}

type diffPreCommitsAPI interface {
Expand Down Expand Up @@ -87,12 +87,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
// when the client node was down after the deal was published, and when
// the precommit containing it landed on chain)

publishTs, err := types.TipSetKeyFromBytes(dealInfo.PublishMsgTipSet)
if err != nil {
return false, false, err
}

diff, err := mgr.dpc.diffPreCommits(ctx, provider, publishTs, ts.Key())
diff, err := mgr.dpc.diffPreCommits(ctx, provider, dealInfo.PublishMsgTipSet, ts.Key())
if err != nil {
return false, false, err
}
Expand Down Expand Up @@ -142,7 +137,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,

// When there is a reorg, the deal ID may change, so get the
// current deal ID from the publish message CID
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key(), &proposal, publishCid)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -250,7 +245,7 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
}

// Get the deal info
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key(), &proposal, publishCid)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
Expand Down Expand Up @@ -384,7 +379,7 @@ func sectorInCommitMsg(msg *types.Message, sectorNumber abi.SectorNumber) (bool,
}

func (mgr *SectorCommittedManager) checkIfDealAlreadyActive(ctx context.Context, ts *types.TipSet, proposal *market.DealProposal, publishCid cid.Cid) (pipeline.CurrentDealInfo, bool, error) {
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), proposal, publishCid)
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key(), proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return res, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions markets/storageadapter/ondealsectorcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
currentDealInfo: pipeline.CurrentDealInfo{
DealID: dealID,
MarketDeal: slashedDeal,
PublishMsgTipSet: nil,
PublishMsgTipSet: types.EmptyTSK,
},
expectedCBCallCount: 0,
expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch),
Expand Down Expand Up @@ -574,7 +574,7 @@ type mockDealInfoAPI struct {
Err2 error
}

func (m *mockDealInfoAPI) GetCurrentDealInfo(ctx context.Context, tok pipeline.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (pipeline.CurrentDealInfo, error) {
func (m *mockDealInfoAPI) GetCurrentDealInfo(ctx context.Context, tok types.TipSetKey, proposal *market.DealProposal, publishCid cid.Cid) (pipeline.CurrentDealInfo, error) {
m.count++
if m.count == 2 {
return m.CurrentDealInfo2, m.Err2
Expand Down
2 changes: 1 addition & 1 deletion markets/storageadapter/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (n *ProviderNodeAdapter) WaitForPublishDeals(ctx context.Context, publishCi
return nil, xerrors.Errorf("WaitForPublishDeals failed to get chain head: %w", err)
}

res, err := n.scMgr.dealInfo.GetCurrentDealInfo(ctx, head.Key().Bytes(), &proposal, publishCid)
res, err := n.scMgr.dealInfo.GetCurrentDealInfo(ctx, head.Key(), &proposal, publishCid)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals getting deal info errored: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/adapter_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func NewEventsAdapter(api *events.Events) EventsAdapter {

func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error {
return e.delegate.ChainAt(context.TODO(), func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return hnd(ctx, ts.Key().Bytes(), curH)
return hnd(ctx, ts.Key(), curH)
}, func(ctx context.Context, ts *types.TipSet) error {
return rev(ctx, ts.Key().Bytes())
return rev(ctx, ts.Key())
}, confidence, h)
}
Loading

0 comments on commit 9aa5659

Please sign in to comment.