From fd8afb8beaa0528fbf16c7972fcf464f785c26e2 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 20 Jun 2022 14:23:50 +0200 Subject: [PATCH] feat: add v1.1.1 of deal proposal protocol --- shared_testutil/test_network_types.go | 12 +- storagemarket/impl/provider.go | 17 +- storagemarket/impl/provider_test.go | 9 +- storagemarket/migrations/migrations.go | 72 ++++---- .../migrations/migrations_mapenc_types.go | 9 +- .../migrations_mapenc_types_cbor_gen.go | 162 ++++++++++++++++++ storagemarket/network/deal_stream.go | 30 ++-- storagemarket/network/deal_stream_v110.go | 82 +++++++++ storagemarket/network/legacy_deal_stream.go | 30 ++-- storagemarket/network/libp2p_impl.go | 26 ++- storagemarket/network/libp2p_impl_test.go | 12 +- storagemarket/network/network.go | 3 +- storagemarket/testharness/testharness.go | 2 +- storagemarket/types.go | 7 +- 14 files changed, 380 insertions(+), 93 deletions(-) create mode 100644 storagemarket/network/deal_stream_v110.go diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index 89d0de03..bb1e3c46 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -398,7 +398,7 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader { } // StorageDealProposalReader is a function to mock reading deal proposals. -type StorageDealProposalReader func() (smnet.Proposal, error) +type StorageDealProposalReader func() (smnet.Proposal, cid.Cid, error) // StorageDealResponseReader is a function to mock reading deal responses. type StorageDealResponseReader func() (smnet.SignedResponse, []byte, error) @@ -460,7 +460,7 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) *TestStorageDe } // ReadDealProposal calls the mocked deal proposal reader function. -func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) { +func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, cid.Cid, error) { return tsds.proposalReader() } @@ -489,8 +489,8 @@ func (tsds *TestStorageDealStream) Close() error { } // TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal. -func TrivialStorageDealProposalReader() (smnet.Proposal, error) { - return smnet.Proposal{}, nil +func TrivialStorageDealProposalReader() (smnet.Proposal, cid.Cid, error) { + return smnet.Proposal{}, cid.Undef, nil } // TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response. @@ -510,8 +510,8 @@ func TrivialStorageDealResponseWriter(smnet.SignedResponse, smnet.ResigningFunc) // StubbedStorageProposalReader returns the given proposal when called func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader { - return func() (smnet.Proposal, error) { - return proposal, nil + return func() (smnet.Proposal, cid.Cid, error) { + return proposal, cid.Undef, nil } } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 635bfbd6..6ccad5ed 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -256,7 +256,7 @@ func (p *Provider) HandleDealStream(s network.StorageDealStream) { } func (p *Provider) receiveDeal(s network.StorageDealStream) error { - proposal, err := s.ReadDealProposal() + proposal, propCid, err := s.ReadDealProposal() if err != nil { return xerrors.Errorf("failed to read proposal message: %w", err) } @@ -265,14 +265,9 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { return xerrors.Errorf("failed to get deal proposal from proposal message") } - proposalNd, err := cborutil.AsIpld(proposal.DealProposal) - if err != nil { - return err - } - // Check if we are already tracking this deal var md storagemarket.MinerDeal - if err := p.deals.Get(proposalNd.Cid()).Get(&md); err == nil { + if err := p.deals.Get(propCid).Get(&md); err == nil { // We are already tracking this deal, for some reason it was re-proposed, perhaps because of a client restart // this is ok, just send a response back. return p.resendProposalResponse(s, &md) @@ -300,7 +295,7 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { Client: s.RemotePeer(), Miner: p.net.ID(), ClientDealProposal: *proposal.DealProposal, - ProposalCid: proposalNd.Cid(), + ProposalCid: propCid, State: storagemarket.StorageDealUnknown, Ref: proposal.Piece, FastRetrieval: proposal.FastRetrieval, @@ -308,15 +303,15 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { InboundCAR: path, } - err = p.deals.Begin(proposalNd.Cid(), deal) + err = p.deals.Begin(propCid, deal) if err != nil { return err } - err = p.conns.AddStream(proposalNd.Cid(), s) + err = p.conns.AddStream(propCid, s) if err != nil { return err } - return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) + return p.deals.Send(propCid, storagemarket.ProviderEventOpen) } // Stop terminates processing of deals on a StorageProvider diff --git a/storagemarket/impl/provider_test.go b/storagemarket/impl/provider_test.go index cf9e1381..cd769ef5 100644 --- a/storagemarket/impl/provider_test.go +++ b/storagemarket/impl/provider_test.go @@ -306,12 +306,17 @@ func TestHandleDealStream(t *testing.T) { var responseWriteCount int s := shared_testutil.NewTestStorageDealStream(shared_testutil.TestStorageDealStreamParams{ - ProposalReader: func() (network.Proposal, error) { + ProposalReader: func() (network.Proposal, cid.Cid, error) { + propNd, err := cborutil.AsIpld(proposal) + if err != nil { + return network.Proposal{}, cid.Undef, err + } + return network.Proposal{ DealProposal: proposal, Piece: dataRef, FastRetrieval: false, - }, nil + }, propNd.Cid(), nil }, ResponseWriter: func(response network.SignedResponse, resigningFunc network.ResigningFunc) error { responseWriteCount += 1 diff --git a/storagemarket/migrations/migrations.go b/storagemarket/migrations/migrations.go index f6f7ac39..97525012 100644 --- a/storagemarket/migrations/migrations.go +++ b/storagemarket/migrations/migrations.go @@ -219,42 +219,13 @@ func MigrateMinerDeal0To1(oldCd *MinerDeal0) (*MinerDeal1, error) { // MigrateMinerDeal1To2 migrates a miner deal label to the new format func MigrateMinerDeal1To2(oldCd *MinerDeal1) (*storagemarket.MinerDeal, error) { - oldLabel := oldCd.Proposal.Label - - var err error - var newLabel market.DealLabel - if utf8.ValidString(oldLabel) { - newLabel, err = market.NewLabelFromString(oldLabel) - if err != nil { - return nil, fmt.Errorf("migrating deal label to DealLabel (string) for deal with proposal cid %s: %w", oldCd.ProposalCid, err) - } - } else { - newLabel, err = market.NewLabelFromBytes([]byte(oldLabel)) - if err != nil { - return nil, fmt.Errorf("migrating deal label to DealLabel (byte) for deal with proposal cid %s: %w", oldCd.ProposalCid, err) - } + clientDealProp, err := MigrateClientDealProposal0To1(oldCd.ClientDealProposal) + if err != nil { + return nil, fmt.Errorf("migrating deal with proposal cid %s: %w", oldCd.ProposalCid, err) } return &storagemarket.MinerDeal{ - ClientDealProposal: storagemarket.ClientDealProposal{ - ClientSignature: crypto.Signature{ - Type: crypto.SigType(oldCd.ClientDealProposal.ClientSignature.Type), - Data: oldCd.ClientDealProposal.ClientSignature.Data, - }, - Proposal: market.DealProposal{ - PieceCID: oldCd.ClientDealProposal.Proposal.PieceCID, - PieceSize: oldCd.ClientDealProposal.Proposal.PieceSize, - VerifiedDeal: oldCd.ClientDealProposal.Proposal.VerifiedDeal, - Client: oldCd.ClientDealProposal.Proposal.Client, - Provider: oldCd.ClientDealProposal.Proposal.Provider, - Label: newLabel, - StartEpoch: oldCd.ClientDealProposal.Proposal.StartEpoch, - EndEpoch: oldCd.ClientDealProposal.Proposal.EndEpoch, - StoragePricePerEpoch: oldCd.ClientDealProposal.Proposal.StoragePricePerEpoch, - ProviderCollateral: oldCd.ClientDealProposal.Proposal.ProviderCollateral, - ClientCollateral: oldCd.ClientDealProposal.Proposal.ClientCollateral, - }, - }, + ClientDealProposal: *clientDealProp, ProposalCid: oldCd.ProposalCid, AddFundsCid: oldCd.AddFundsCid, PublishCid: oldCd.PublishCid, @@ -274,6 +245,41 @@ func MigrateMinerDeal1To2(oldCd *MinerDeal1) (*storagemarket.MinerDeal, error) { }, nil } +func MigrateClientDealProposal0To1(prop marketOld.ClientDealProposal) (*storagemarket.ClientDealProposal, error) { + oldLabel := prop.Proposal.Label + + var err error + var newLabel market.DealLabel + if utf8.ValidString(oldLabel) { + newLabel, err = market.NewLabelFromString(oldLabel) + if err != nil { + return nil, fmt.Errorf("migrating deal label to DealLabel (string): %w", err) + } + } else { + newLabel, err = market.NewLabelFromBytes([]byte(oldLabel)) + if err != nil { + return nil, fmt.Errorf("migrating deal label to DealLabel (byte): %w", err) + } + } + + return &storagemarket.ClientDealProposal{ + ClientSignature: prop.ClientSignature, + Proposal: market.DealProposal{ + PieceCID: prop.Proposal.PieceCID, + PieceSize: prop.Proposal.PieceSize, + VerifiedDeal: prop.Proposal.VerifiedDeal, + Client: prop.Proposal.Client, + Provider: prop.Proposal.Provider, + Label: newLabel, + StartEpoch: prop.Proposal.StartEpoch, + EndEpoch: prop.Proposal.EndEpoch, + StoragePricePerEpoch: prop.Proposal.StoragePricePerEpoch, + ProviderCollateral: prop.Proposal.ProviderCollateral, + ClientCollateral: prop.Proposal.ClientCollateral, + }, + }, nil +} + // MigrateStorageAsk0To1 migrates a tuple encoded storage ask to a map encoded storage ask func MigrateStorageAsk0To1(oldSa *StorageAsk0) *storagemarket.StorageAsk { return &storagemarket.StorageAsk{ diff --git a/storagemarket/migrations/migrations_mapenc_types.go b/storagemarket/migrations/migrations_mapenc_types.go index 1627c7c7..983d67f7 100644 --- a/storagemarket/migrations/migrations_mapenc_types.go +++ b/storagemarket/migrations/migrations_mapenc_types.go @@ -18,7 +18,14 @@ import ( // generate directive in a separate file. So we define CBOR map-encoded types // in this file -//go:generate cbor-gen-for --map-encoding MinerDeal1 +//go:generate cbor-gen-for --map-encoding Proposal1 MinerDeal1 + +// Proposal1 is version 1 of Proposal (used by deal proposal protocol v1.1.0) +type Proposal1 struct { + DealProposal *marketOld.ClientDealProposal + Piece *storagemarket.DataRef + FastRetrieval bool +} // MinerDeal1 is version 1 of MinerDeal type MinerDeal1 struct { diff --git a/storagemarket/migrations/migrations_mapenc_types_cbor_gen.go b/storagemarket/migrations/migrations_mapenc_types_cbor_gen.go index dea393a8..8c752a64 100644 --- a/storagemarket/migrations/migrations_mapenc_types_cbor_gen.go +++ b/storagemarket/migrations/migrations_mapenc_types_cbor_gen.go @@ -12,6 +12,7 @@ import ( filestore "github.com/filecoin-project/go-fil-markets/filestore" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" abi "github.com/filecoin-project/go-state-types/abi" + market "github.com/filecoin-project/specs-actors/actors/builtin/market" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" @@ -23,6 +24,167 @@ var _ = cid.Undef var _ = math.E var _ = sort.Sort +func (t *Proposal1) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{163}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.DealProposal (market.ClientDealProposal) (struct) + if len("DealProposal") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealProposal\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealProposal"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("DealProposal")); err != nil { + return err + } + + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Piece (storagemarket.DataRef) (struct) + if len("Piece") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Piece\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Piece"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Piece")); err != nil { + return err + } + + if err := t.Piece.MarshalCBOR(w); err != nil { + return err + } + + // t.FastRetrieval (bool) (bool) + if len("FastRetrieval") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"FastRetrieval\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("FastRetrieval"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("FastRetrieval")); err != nil { + return err + } + + if err := cbg.WriteBool(w, t.FastRetrieval); err != nil { + return err + } + return nil +} + +func (t *Proposal1) UnmarshalCBOR(r io.Reader) error { + *t = Proposal1{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("Proposal1: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.DealProposal (market.ClientDealProposal) (struct) + case "DealProposal": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + t.DealProposal = new(market.ClientDealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) + } + } + + } + // t.Piece (storagemarket.DataRef) (struct) + case "Piece": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + t.Piece = new(storagemarket.DataRef) + if err := t.Piece.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Piece pointer: %w", err) + } + } + + } + // t.FastRetrieval (bool) (bool) + case "FastRetrieval": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.FastRetrieval = false + case 21: + t.FastRetrieval = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + + default: + // Field doesn't exist on this type, so ignore it + cbg.ScanForLinks(r, func(cid.Cid) {}) + } + } + + return nil +} func (t *MinerDeal1) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go index 9b2ff8d8..fcf54a21 100644 --- a/storagemarket/network/deal_stream.go +++ b/storagemarket/network/deal_stream.go @@ -2,7 +2,9 @@ package network import ( "bufio" + "fmt" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" @@ -13,30 +15,38 @@ import ( // TagPriority is the priority for deal streams -- they should generally be preserved above all else const TagPriority = 100 -type dealStream struct { +type dealStreamv111 struct { p peer.ID host host.Host rw mux.MuxedStream buffered *bufio.Reader } -var _ StorageDealStream = (*dealStream)(nil) +var _ StorageDealStream = (*dealStreamv111)(nil) -func (d *dealStream) ReadDealProposal() (Proposal, error) { +func (d *dealStreamv111) ReadDealProposal() (Proposal, cid.Cid, error) { var ds Proposal if err := ds.UnmarshalCBOR(d.buffered); err != nil { log.Warn(err) - return ProposalUndefined, err + return ProposalUndefined, cid.Undef, err } - return ds, nil + + proposalNd, err := cborutil.AsIpld(ds.DealProposal) + if err != nil { + err = fmt.Errorf("getting v111 deal proposal as IPLD: %w", err) + log.Warnf(err.Error()) + return ProposalUndefined, cid.Undef, err + } + + return ds, proposalNd.Cid(), nil } -func (d *dealStream) WriteDealProposal(dp Proposal) error { +func (d *dealStreamv111) WriteDealProposal(dp Proposal) error { return cborutil.WriteCborRPC(d.rw, &dp) } -func (d *dealStream) ReadDealResponse() (SignedResponse, []byte, error) { +func (d *dealStreamv111) ReadDealResponse() (SignedResponse, []byte, error) { var dr SignedResponse if err := dr.UnmarshalCBOR(d.buffered); err != nil { @@ -49,14 +59,14 @@ func (d *dealStream) ReadDealResponse() (SignedResponse, []byte, error) { return dr, origBytes, nil } -func (d *dealStream) WriteDealResponse(dr SignedResponse, _ ResigningFunc) error { +func (d *dealStreamv111) WriteDealResponse(dr SignedResponse, _ ResigningFunc) error { return cborutil.WriteCborRPC(d.rw, &dr) } -func (d *dealStream) Close() error { +func (d *dealStreamv111) Close() error { return d.rw.Close() } -func (d *dealStream) RemotePeer() peer.ID { +func (d *dealStreamv111) RemotePeer() peer.ID { return d.p } diff --git a/storagemarket/network/deal_stream_v110.go b/storagemarket/network/deal_stream_v110.go new file mode 100644 index 00000000..0eebba6e --- /dev/null +++ b/storagemarket/network/deal_stream_v110.go @@ -0,0 +1,82 @@ +package network + +import ( + "bufio" + "fmt" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/peer" + + cborutil "github.com/filecoin-project/go-cbor-util" + + "github.com/filecoin-project/go-fil-markets/storagemarket/migrations" +) + +type dealStreamv110 struct { + p peer.ID + host host.Host + rw mux.MuxedStream + buffered *bufio.Reader +} + +var _ StorageDealStream = (*dealStreamv110)(nil) + +func (d *dealStreamv110) ReadDealProposal() (Proposal, cid.Cid, error) { + var ds migrations.Proposal1 + + if err := ds.UnmarshalCBOR(d.buffered); err != nil { + err = fmt.Errorf("unmarshalling v110 deal proposal: %w", err) + log.Warnf(err.Error()) + return ProposalUndefined, cid.Undef, err + } + + proposalNd, err := cborutil.AsIpld(ds.DealProposal) + if err != nil { + err = fmt.Errorf("getting v110 deal proposal as IPLD: %w", err) + log.Warnf(err.Error()) + return ProposalUndefined, cid.Undef, err + } + + prop, err := migrations.MigrateClientDealProposal0To1(*ds.DealProposal) + if err != nil { + err = fmt.Errorf("migrating v110 deal proposal to current version: %w", err) + log.Warnf(err.Error()) + return ProposalUndefined, cid.Undef, err + } + return Proposal{ + DealProposal: prop, + Piece: ds.Piece, + FastRetrieval: ds.FastRetrieval, + }, proposalNd.Cid(), nil +} + +func (d *dealStreamv110) WriteDealProposal(dp Proposal) error { + return cborutil.WriteCborRPC(d.rw, &dp) +} + +func (d *dealStreamv110) ReadDealResponse() (SignedResponse, []byte, error) { + var dr SignedResponse + + if err := dr.UnmarshalCBOR(d.buffered); err != nil { + return SignedResponseUndefined, nil, err + } + origBytes, err := cborutil.Dump(&dr.Response) + if err != nil { + return SignedResponseUndefined, nil, err + } + return dr, origBytes, nil +} + +func (d *dealStreamv110) WriteDealResponse(dr SignedResponse, _ ResigningFunc) error { + return cborutil.WriteCborRPC(d.rw, &dr) +} + +func (d *dealStreamv110) Close() error { + return d.rw.Close() +} + +func (d *dealStreamv110) RemotePeer() peer.ID { + return d.p +} diff --git a/storagemarket/network/legacy_deal_stream.go b/storagemarket/network/legacy_deal_stream.go index 7ef4f7c4..9eea8624 100644 --- a/storagemarket/network/legacy_deal_stream.go +++ b/storagemarket/network/legacy_deal_stream.go @@ -3,7 +3,9 @@ package network import ( "bufio" "context" + "fmt" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" @@ -13,30 +15,38 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/migrations" ) -type legacyDealStream struct { +type dealStreamv101 struct { p peer.ID host host.Host rw mux.MuxedStream buffered *bufio.Reader } -var _ StorageDealStream = (*legacyDealStream)(nil) +var _ StorageDealStream = (*dealStreamv101)(nil) -func (d *legacyDealStream) ReadDealProposal() (Proposal, error) { +func (d *dealStreamv101) ReadDealProposal() (Proposal, cid.Cid, error) { var ds migrations.Proposal0 if err := ds.UnmarshalCBOR(d.buffered); err != nil { log.Warn(err) - return ProposalUndefined, err + return ProposalUndefined, cid.Undef, err } + + proposalNd, err := cborutil.AsIpld(ds.DealProposal) + if err != nil { + err = fmt.Errorf("getting v101 deal proposal as IPLD: %w", err) + log.Warnf(err.Error()) + return ProposalUndefined, cid.Undef, err + } + return Proposal{ DealProposal: ds.DealProposal, Piece: migrations.MigrateDataRef0To1(ds.Piece), FastRetrieval: ds.FastRetrieval, - }, nil + }, proposalNd.Cid(), nil } -func (d *legacyDealStream) WriteDealProposal(dp Proposal) error { +func (d *dealStreamv101) WriteDealProposal(dp Proposal) error { var piece *migrations.DataRef0 if dp.Piece != nil { piece = &migrations.DataRef0{ @@ -53,7 +63,7 @@ func (d *legacyDealStream) WriteDealProposal(dp Proposal) error { }) } -func (d *legacyDealStream) ReadDealResponse() (SignedResponse, []byte, error) { +func (d *dealStreamv101) ReadDealResponse() (SignedResponse, []byte, error) { var dr migrations.SignedResponse0 if err := dr.UnmarshalCBOR(d.buffered); err != nil { @@ -74,7 +84,7 @@ func (d *legacyDealStream) ReadDealResponse() (SignedResponse, []byte, error) { }, origBytes, nil } -func (d *legacyDealStream) WriteDealResponse(dr SignedResponse, resign ResigningFunc) error { +func (d *dealStreamv101) WriteDealResponse(dr SignedResponse, resign ResigningFunc) error { oldResponse := migrations.Response0{ State: dr.Response.State, Message: dr.Response.Message, @@ -91,10 +101,10 @@ func (d *legacyDealStream) WriteDealResponse(dr SignedResponse, resign Resigning }) } -func (d *legacyDealStream) Close() error { +func (d *dealStreamv101) Close() error { return d.rw.Close() } -func (d *legacyDealStream) RemotePeer() peer.ID { +func (d *dealStreamv101) RemotePeer() peer.ID { return d.p } diff --git a/storagemarket/network/libp2p_impl.go b/storagemarket/network/libp2p_impl.go index 2a243921..60ddbe29 100644 --- a/storagemarket/network/libp2p_impl.go +++ b/storagemarket/network/libp2p_impl.go @@ -59,8 +59,9 @@ func NewFromLibp2pHost(h host.Host, options ...Option) StorageMarketNetwork { storagemarket.OldAskProtocolID, }, supportedDealProtocols: []protocol.ID{ - storagemarket.DealProtocolID, - storagemarket.OldDealProtocolID, + storagemarket.DealProtocolID111, + storagemarket.DealProtocolID110, + storagemarket.DealProtocolID101, }, supportedDealStatusProtocols: []protocol.ID{ storagemarket.DealStatusProtocolID, @@ -104,10 +105,14 @@ func (impl *libp2pStorageMarketNetwork) NewDealStream(ctx context.Context, id pe return nil, err } buffered := bufio.NewReaderSize(s, 16) - if s.Protocol() == storagemarket.OldDealProtocolID { - return &legacyDealStream{p: id, rw: s, buffered: buffered, host: impl.host}, nil + switch s.Protocol() { + case storagemarket.DealProtocolID101: + return &dealStreamv101{p: id, rw: s, buffered: buffered, host: impl.host}, nil + case storagemarket.DealProtocolID110: + return &dealStreamv110{p: id, rw: s, buffered: buffered, host: impl.host}, nil + default: + return &dealStreamv111{p: id, rw: s, buffered: buffered, host: impl.host}, nil } - return &dealStream{p: id, rw: s, buffered: buffered, host: impl.host}, nil } func (impl *libp2pStorageMarketNetwork) NewDealStatusStream(ctx context.Context, id peer.ID) (DealStatusStream, error) { @@ -168,10 +173,13 @@ func (impl *libp2pStorageMarketNetwork) handleNewDealStream(s network.Stream) { reader := impl.getReaderOrReset(s) if reader != nil { var ds StorageDealStream - if s.Protocol() == storagemarket.OldDealProtocolID { - ds = &legacyDealStream{s.Conn().RemotePeer(), impl.host, s, reader} - } else { - ds = &dealStream{s.Conn().RemotePeer(), impl.host, s, reader} + switch s.Protocol() { + case storagemarket.DealProtocolID101: + ds = &dealStreamv101{s.Conn().RemotePeer(), impl.host, s, reader} + case storagemarket.DealProtocolID110: + ds = &dealStreamv110{s.Conn().RemotePeer(), impl.host, s, reader} + default: + ds = &dealStreamv111{s.Conn().RemotePeer(), impl.host, s, reader} } impl.receiver.HandleDealStream(ds) } diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go index 902fe914..0602d373 100644 --- a/storagemarket/network/libp2p_impl_test.go +++ b/storagemarket/network/libp2p_impl_test.go @@ -247,12 +247,12 @@ func TestDealStreamSendReceiveDealProposal(t *testing.T) { td := shared_testutil.NewLibp2pTestData(ctx, t) var fromNetwork, toNetwork network.StorageMarketNetwork if data.senderDisabledNew { - fromNetwork = network.NewFromLibp2pHost(td.Host1, network.SupportedDealProtocols([]protocol.ID{storagemarket.OldDealProtocolID})) + fromNetwork = network.NewFromLibp2pHost(td.Host1, network.SupportedDealProtocols([]protocol.ID{storagemarket.DealProtocolID110})) } else { fromNetwork = network.NewFromLibp2pHost(td.Host1) } if data.receiverDisabledNew { - toNetwork = network.NewFromLibp2pHost(td.Host2, network.SupportedDealProtocols([]protocol.ID{storagemarket.OldDealProtocolID})) + toNetwork = network.NewFromLibp2pHost(td.Host2, network.SupportedDealProtocols([]protocol.ID{storagemarket.DealProtocolID110})) } else { toNetwork = network.NewFromLibp2pHost(td.Host2) } @@ -265,7 +265,7 @@ func TestDealStreamSendReceiveDealProposal(t *testing.T) { tr2 := &testReceiver{ t: t, dealStreamHandler: func(s network.StorageDealStream) { - readD, err := s.ReadDealProposal() + readD, _, err := s.ReadDealProposal() require.NoError(t, err) dchan <- readD }, @@ -297,12 +297,12 @@ func TestDealStreamSendReceiveDealResponse(t *testing.T) { td := shared_testutil.NewLibp2pTestData(ctx, t) var fromNetwork, toNetwork network.StorageMarketNetwork if data.senderDisabledNew { - fromNetwork = network.NewFromLibp2pHost(td.Host1, network.SupportedDealProtocols([]protocol.ID{storagemarket.OldDealProtocolID})) + fromNetwork = network.NewFromLibp2pHost(td.Host1, network.SupportedDealProtocols([]protocol.ID{storagemarket.DealProtocolID110})) } else { fromNetwork = network.NewFromLibp2pHost(td.Host1) } if data.receiverDisabledNew { - toNetwork = network.NewFromLibp2pHost(td.Host2, network.SupportedDealProtocols([]protocol.ID{storagemarket.OldDealProtocolID})) + toNetwork = network.NewFromLibp2pHost(td.Host2, network.SupportedDealProtocols([]protocol.ID{storagemarket.DealProtocolID110})) } else { toNetwork = network.NewFromLibp2pHost(td.Host2) } @@ -343,7 +343,7 @@ func TestDealStreamSendReceiveMultipleSuccessful(t *testing.T) { return nil, nil } tr2 := &testReceiver{t: t, dealStreamHandler: func(s network.StorageDealStream) { - _, err := s.ReadDealProposal() + _, _, err := s.ReadDealProposal() require.NoError(t, err) require.NoError(t, s.WriteDealResponse(dr, resigningFunc)) diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go index 83e506b8..38d2eab6 100644 --- a/storagemarket/network/network.go +++ b/storagemarket/network/network.go @@ -3,6 +3,7 @@ package network import ( "context" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -28,7 +29,7 @@ type StorageAskStream interface { // StorageDealStream is a stream for reading and writing requests // and responses on the storage deal protocol type StorageDealStream interface { - ReadDealProposal() (Proposal, error) + ReadDealProposal() (Proposal, cid.Cid, error) WriteDealProposal(Proposal) error ReadDealResponse() (SignedResponse, []byte, error) WriteDealResponse(SignedResponse, ResigningFunc) error diff --git a/storagemarket/testharness/testharness.go b/storagemarket/testharness/testharness.go index 987f2d39..7c8ffcf0 100644 --- a/storagemarket/testharness/testharness.go +++ b/storagemarket/testharness/testharness.go @@ -106,7 +106,7 @@ func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, de if disableNewDeals { networkOptions = append(networkOptions, network.SupportedAskProtocols([]protocol.ID{storagemarket.OldAskProtocolID}), - network.SupportedDealProtocols([]protocol.ID{storagemarket.OldDealProtocolID}), + network.SupportedDealProtocols([]protocol.ID{storagemarket.DealProtocolID110}), network.SupportedDealStatusProtocols([]protocol.ID{storagemarket.OldDealStatusProtocolID}), ) } diff --git a/storagemarket/types.go b/storagemarket/types.go index 4d7909bb..42c02573 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -23,9 +23,10 @@ var log = logging.Logger("storagemrkt") //go:generate cbor-gen-for --map-encoding ClientDeal MinerDeal Balance SignedStorageAsk StorageAsk DataRef ProviderDealState DealStages DealStage Log -// DealProtocolID is the ID for the libp2p protocol for proposing storage deals. -const OldDealProtocolID = "/fil/storage/mk/1.0.1" -const DealProtocolID = "/fil/storage/mk/1.1.0" +// The ID for the libp2p protocol for proposing storage deals. +const DealProtocolID101 = "/fil/storage/mk/1.0.1" +const DealProtocolID110 = "/fil/storage/mk/1.1.0" +const DealProtocolID111 = "/fil/storage/mk/1.1.1" // AskProtocolID is the ID for the libp2p protocol for querying miners for their current StorageAsk. const OldAskProtocolID = "/fil/storage/ask/1.0.1"