From 32e5ccef50d88d8a19b79f71ea98157c4a0047d2 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 4 Jun 2021 18:53:00 +0530 Subject: [PATCH] Dynamic Retrieval Pricing (#542) * refactor for dynamic pricing * test differential pricing * dynamic pricing * changes for dynamic pricing * Apply suggestions from code review Co-authored-by: raulk Co-authored-by: dirkmc * changes as per review and test json * Apply suggestions from code review Co-authored-by: dirkmc * fix compilation * changes as per review * default retrieval pricing function * fix compilation * test the default pricing function * fix bug in quoting price * fix: go mod tidy Co-authored-by: raulk Co-authored-by: dirkmc --- retrievalmarket/impl/integration_test.go | 53 +- retrievalmarket/impl/provider.go | 177 ++++- retrievalmarket/impl/provider_environments.go | 137 +++- retrievalmarket/impl/provider_test.go | 741 +++++++++++++++++- .../impl/providerstates/provider_states.go | 16 + .../providerstates/provider_states_test.go | 48 +- .../requestvalidation/requestvalidation.go | 34 +- .../requestvalidation_test.go | 14 +- .../testnodes/test_retrieval_provider_node.go | 41 + retrievalmarket/network/network.go | 1 + retrievalmarket/network/old_query_stream.go | 4 + retrievalmarket/network/query_stream.go | 4 + retrievalmarket/nodes.go | 4 + .../storage_retrieval_integration_test.go | 23 +- retrievalmarket/types.go | 18 + retrievalmarket/types_test.go | 29 + shared_testutil/test_network_types.go | 10 +- 17 files changed, 1224 insertions(+), 130 deletions(-) diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 50e47b23..3a2fe90f 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -58,6 +58,10 @@ func TestClientCanMakeQueryToProvider(t *testing.T) { expectedQR.Status = retrievalmarket.QueryResponseUnavailable expectedQR.Size = 0 actualQR, err := client.Query(bgCtx, retrievalPeer, missingPiece, retrievalmarket.QueryParams{}) + actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval + actualQR.MinPricePerByte = expectedQR.MinPricePerByte + actualQR.MaxPaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease + actualQR.UnsealPrice = expectedQR.UnsealPrice assert.NoError(t, err) assert.Equal(t, expectedQR, actualQR) }) @@ -65,9 +69,13 @@ func TestClientCanMakeQueryToProvider(t *testing.T) { t.Run("when there is some other error, returns error", func(t *testing.T) { unknownPiece := tut.GenerateCids(1)[0] expectedQR.Status = retrievalmarket.QueryResponseError - expectedQR.Message = "get cid info: GetCIDInfo failed" + expectedQR.Message = "failed to fetch piece to retrieve from: get cid info: GetCIDInfo failed" actualQR, err := client.Query(bgCtx, retrievalPeer, unknownPiece, retrievalmarket.QueryParams{}) assert.NoError(t, err) + actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval + actualQR.MinPricePerByte = expectedQR.MinPricePerByte + actualQR.MaxPaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease + actualQR.UnsealPrice = expectedQR.UnsealPrice assert.Equal(t, expectedQR, actualQR) }) @@ -148,21 +156,29 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA testutil.StartAndWaitForReady(ctx, t, dt2) require.NoError(t, err) providerDs := namespace.Wrap(testData.Ds2, datastore.NewKey("/retrievals/provider")) - provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.MultiStore2, dt2, providerDs) + + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + ask.PaymentInterval = expectedQR.MaxPaymentInterval + ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease + ask.PricePerByte = expectedQR.MinPricePerByte + ask.UnsealPrice = expectedQR.UnsealPrice + return ask, nil + } + + provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.MultiStore2, dt2, providerDs, + priceFunc) require.NoError(t, err) - ask := provider.GetAsk() - ask.PaymentInterval = expectedQR.MaxPaymentInterval - ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease - ask.PricePerByte = expectedQR.MinPricePerByte - ask.UnsealPrice = expectedQR.UnsealPrice - provider.SetAsk(ask) tut.StartAndWaitForReady(ctx, t, provider) retrievalPeer := retrievalmarket.RetrievalPeer{ Address: paymentAddress, ID: testData.Host2.ID(), } rcNode1.ExpectKnownAddresses(retrievalPeer, nil) + + expectedQR.Size = uint64(abi.PaddedPieceSize(expectedQR.Size).Unpadded()) + return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider } @@ -384,12 +400,14 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { PieceCID: tut.GenerateCids(1)[0], Deals: []piecestore.DealInfo{ { + DealID: abi.DealID(100), SectorID: sectorID, Offset: offset, Length: abi.UnpaddedPieceSize(len(carData)).Padded(), }, }, } + providerNode.ExpectPricingParams(pieceInfo.PieceCID, []abi.DealID{100}) if testCase.failsUnseal { providerNode.ExpectFailedUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData))) } else { @@ -664,18 +682,21 @@ func setupProvider( if disableNewDeals { opts = append(opts, retrievalimpl.DisableNewDeals()) } + + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + ask.PaymentInterval = expectedQR.MaxPaymentInterval + ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease + ask.PricePerByte = expectedQR.MinPricePerByte + ask.UnsealPrice = expectedQR.UnsealPrice + return ask, nil + } + provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, - pieceStore, testData.MultiStore2, dt2, providerDs, + pieceStore, testData.MultiStore2, dt2, providerDs, priceFunc, opts...) require.NoError(t, err) - ask := provider.GetAsk() - - ask.PaymentInterval = expectedQR.MaxPaymentInterval - ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease - ask.PricePerByte = expectedQR.MinPricePerByte - ask.UnsealPrice = expectedQR.UnsealPrice - provider.SetAsk(ask) return provider } diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index 5ea8ae5c..263b1930 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -3,6 +3,8 @@ package retrievalimpl import ( "context" "errors" + "fmt" + "time" "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" @@ -15,6 +17,8 @@ import ( versioning "github.com/filecoin-project/go-ds-versioning/pkg" versionedfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm" "github.com/filecoin-project/go-multistore" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -34,6 +38,10 @@ type RetrievalProviderOption func(p *Provider) // DealDecider is a function that makes a decision about whether to accept a deal type DealDecider func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) +type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) + +var queryTimeout = 5 * time.Second + // Provider is the production implementation of the RetrievalProvider interface type Provider struct { multiStore *multistore.MultiStore @@ -51,6 +59,7 @@ type Provider struct { dealDecider DealDecider askStore retrievalmarket.AskStore disableNewDeals bool + retrievalPricingFunc RetrievalPricingFunc } type internalProviderEvent struct { @@ -95,24 +104,31 @@ func NewProvider(minerAddress address.Address, multiStore *multistore.MultiStore, dataTransfer datatransfer.Manager, ds datastore.Batching, + retrievalPricingFunc RetrievalPricingFunc, opts ...RetrievalProviderOption, ) (retrievalmarket.RetrievalProvider, error) { + if retrievalPricingFunc == nil { + return nil, xerrors.New("retrievalPricingFunc is nil") + } + p := &Provider{ - multiStore: multiStore, - dataTransfer: dataTransfer, - node: node, - network: network, - minerAddress: minerAddress, - pieceStore: pieceStore, - subscribers: pubsub.New(providerDispatcher), - readySub: pubsub.New(shared.ReadyDispatcher), + multiStore: multiStore, + dataTransfer: dataTransfer, + node: node, + network: network, + minerAddress: minerAddress, + pieceStore: pieceStore, + subscribers: pubsub.New(providerDispatcher), + readySub: pubsub.New(shared.ReadyDispatcher), + retrievalPricingFunc: retrievalPricingFunc, } err := shared.MoveKey(ds, "retrieval-ask", "retrieval-ask/latest") if err != nil { return nil, err } + askStore, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest")) if err != nil { return nil, err @@ -234,6 +250,7 @@ func (p *Provider) GetAsk() *retrievalmarket.Ask { // SetAsk sets the deal parameters this provider accepts func (p *Provider) SetAsk(ask *retrievalmarket.Ask) { + err := p.askStore.SetAsk(ask) if err != nil { @@ -268,63 +285,128 @@ A Provider handling a retrieval `Query` does the following: The connection is kept open only as long as the query-response exchange. */ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { + ctx, cancel := context.WithTimeout(context.TODO(), queryTimeout) + defer cancel() + defer stream.Close() query, err := stream.ReadQuery() if err != nil { return } - ask := p.GetAsk() + sendResp := func(resp retrievalmarket.QueryResponse) { + if err := stream.WriteQueryResponse(resp); err != nil { + log.Errorf("Retrieval query: writing query response: %s", err) + } + } answer := retrievalmarket.QueryResponse{ - Status: retrievalmarket.QueryResponseUnavailable, - PieceCIDFound: retrievalmarket.QueryItemUnavailable, - MinPricePerByte: ask.PricePerByte, - MaxPaymentInterval: ask.PaymentInterval, - MaxPaymentIntervalIncrease: ask.PaymentIntervalIncrease, - UnsealPrice: ask.UnsealPrice, + Status: retrievalmarket.QueryResponseUnavailable, + PieceCIDFound: retrievalmarket.QueryItemUnavailable, + MinPricePerByte: big.Zero(), + UnsealPrice: big.Zero(), } - ctx := context.TODO() - + // get chain head to query actor states. tok, _, err := p.node.GetChainHead(ctx) if err != nil { log.Errorf("Retrieval query: GetChainHead: %s", err) return } + // fetch the payment address the client should send the payment to. paymentAddress, err := p.node.GetMinerWorkerAddress(ctx, p.minerAddress, tok) if err != nil { log.Errorf("Retrieval query: Lookup Payment Address: %s", err) answer.Status = retrievalmarket.QueryResponseError - answer.Message = err.Error() - } else { - answer.PaymentAddress = paymentAddress - - pieceCID := cid.Undef - if query.PieceCID != nil { - pieceCID = *query.PieceCID + answer.Message = fmt.Sprintf("failed to look up payment address: %s", err) + sendResp(answer) + return + } + answer.PaymentAddress = paymentAddress + + // fetch the piece from which the payload will be retrieved. + // if user has specified the Piece in the request, we use that. + // Otherwise, we prefer a Piece which can retrieved from an unsealed sector. + pieceCID := cid.Undef + if query.PieceCID != nil { + pieceCID = *query.PieceCID + } + pieceInfo, isUnsealed, err := getPieceInfoFromCid(ctx, p.node, p.pieceStore, query.PayloadCID, pieceCID) + if err != nil { + log.Errorf("Retrieval query: getPieceInfoFromCid: %s", err) + if !xerrors.Is(err, retrievalmarket.ErrNotFound) { + answer.Status = retrievalmarket.QueryResponseError + answer.Message = fmt.Sprintf("failed to fetch piece to retrieve from: %s", err) } - pieceInfo, err := getPieceInfoFromCid(p.pieceStore, query.PayloadCID, pieceCID) - if err == nil && len(pieceInfo.Deals) > 0 { - answer.Status = retrievalmarket.QueryResponseAvailable - // TODO: get price, look for already unsealed ref to reduce work - answer.Size = uint64(pieceInfo.Deals[0].Length) // TODO: verify on intermediate - answer.PieceCIDFound = retrievalmarket.QueryItemAvailable - } + sendResp(answer) + return + } - if err != nil && !xerrors.Is(err, retrievalmarket.ErrNotFound) { - log.Errorf("Retrieval query: GetRefs: %s", err) - answer.Status = retrievalmarket.QueryResponseError - answer.Message = err.Error() - } + answer.Status = retrievalmarket.QueryResponseAvailable + answer.Size = uint64(pieceInfo.Deals[0].Length.Unpadded()) // TODO: verify on intermediate + answer.PieceCIDFound = retrievalmarket.QueryItemAvailable + + storageDeals, err := storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo, p.pieceStore) + if err != nil { + log.Errorf("Retrieval query: storageDealsForPiece: %s", err) + answer.Status = retrievalmarket.QueryResponseError + answer.Message = fmt.Sprintf("failed to fetch storage deals containing payload: %s", err) + sendResp(answer) + return + } + input := retrievalmarket.PricingInput{ + // piece from which the payload will be retrieved + // If user hasn't given a PieceCID, we try to choose an unsealed piece in the call to `getPieceInfoFromCid` above. + PieceCID: pieceInfo.PieceCID, + + PayloadCID: query.PayloadCID, + Unsealed: isUnsealed, + Client: stream.RemotePeer(), } - if err := stream.WriteQueryResponse(answer); err != nil { - log.Errorf("Retrieval query: WriteCborRPC: %s", err) + ask, err := p.GetDynamicAsk(ctx, input, storageDeals) + if err != nil { + log.Errorf("Retrieval query: GetAsk: %s", err) + answer.Status = retrievalmarket.QueryResponseError + answer.Message = fmt.Sprintf("failed to price deal: %s", err) + sendResp(answer) return } + + answer.MinPricePerByte = ask.PricePerByte + answer.MaxPaymentInterval = ask.PaymentInterval + answer.MaxPaymentIntervalIncrease = ask.PaymentIntervalIncrease + answer.UnsealPrice = ask.UnsealPrice + sendResp(answer) +} + +// GetDynamicAsk quotes a dynamic price for the retrieval deal by calling the user configured +// dynamic pricing function. It passes the static price parameters set in the Ask Store to the pricing function. +func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) { + dp, err := p.node.GetRetrievalPricingInput(ctx, input.PieceCID, storageDeals) + if err != nil { + return retrievalmarket.Ask{}, xerrors.Errorf("GetRetrievalPricingInput: %s", err) + } + // currAsk cannot be nil as we initialize the ask store with a default ask. + // Users can then change the values in the ask store using SetAsk but not remove it. + currAsk := p.GetAsk() + if currAsk == nil { + return retrievalmarket.Ask{}, xerrors.New("no ask configured in ask-store") + } + + dp.PayloadCID = input.PayloadCID + dp.PieceCID = input.PieceCID + dp.Unsealed = input.Unsealed + dp.Client = input.Client + dp.CurrentAsk = *currAsk + + ask, err := p.retrievalPricingFunc(ctx, dp) + if err != nil { + return retrievalmarket.Ask{}, xerrors.Errorf("retrievalPricingFunc: %w", err) + } + return ask, nil } // Configure reconfigures a provider after initialization @@ -342,3 +424,22 @@ var ProviderFSMParameterSpec = fsm.Parameters{ Events: providerstates.ProviderEvents, StateEntryFuncs: providerstates.ProviderStateEntryFuncs, } + +// DefaultPricingFunc is the default pricing policy that will be used to price retrieval deals. +var DefaultPricingFunc = func(VerifiedDealsFreeTransfer bool) func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := pricingInput.CurrentAsk + + // don't charge for Unsealing if we have an Unsealed copy. + if pricingInput.Unsealed { + ask.UnsealPrice = big.Zero() + } + + // don't charge for data transfer for verified deals if it's been configured to do so. + if pricingInput.VerifiedDeal && VerifiedDealsFreeTransfer { + ask.PricePerByte = big.Zero() + } + + return ask, nil + } +} diff --git a/retrievalmarket/impl/provider_environments.go b/retrievalmarket/impl/provider_environments.go index 3947ffe6..0f9dc4f3 100644 --- a/retrievalmarket/impl/provider_environments.go +++ b/retrievalmarket/impl/provider_environments.go @@ -30,17 +30,37 @@ type providerValidationEnvironment struct { p *Provider } -func (pve *providerValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, error) { +func (pve *providerValidationEnvironment) GetAsk(ctx context.Context, payloadCid cid.Cid, pieceCid *cid.Cid, + piece piecestore.PieceInfo, isUnsealed bool, client peer.ID) (retrievalmarket.Ask, error) { + + storageDeals, err := storageDealsForPiece(pieceCid != nil, payloadCid, piece, pve.p.pieceStore) + if err != nil { + return retrievalmarket.Ask{}, xerrors.Errorf("failed to fetch deals for payload, err=%s", err) + } + + input := retrievalmarket.PricingInput{ + // piece from which the payload will be retrieved + PieceCID: piece.PieceCID, + + PayloadCID: payloadCid, + Unsealed: isUnsealed, + Client: client, + } + + return pve.p.GetDynamicAsk(ctx, input, storageDeals) +} + +func (pve *providerValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, bool, error) { inPieceCid := cid.Undef if pieceCID != nil { inPieceCid = *pieceCID } - return getPieceInfoFromCid(pve.p.pieceStore, c, inPieceCid) + + return getPieceInfoFromCid(context.TODO(), pve.p.node, pve.p.pieceStore, c, inPieceCid) } // CheckDealParams verifies the given deal params are acceptable -func (pve *providerValidationEnvironment) CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error { - ask := pve.p.GetAsk() +func (pve *providerValidationEnvironment) CheckDealParams(ask retrievalmarket.Ask, pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error { if pricePerByte.LessThan(ask.PricePerByte) { return errors.New("Price per byte too low") } @@ -188,25 +208,120 @@ func (pde *providerDealEnvironment) CloseDataTransfer(ctx context.Context, chid func (pde *providerDealEnvironment) DeleteStore(storeID multistore.StoreID) error { return pde.p.multiStore.Delete(storeID) } -func getPieceInfoFromCid(pieceStore piecestore.PieceStore, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, error) { + +func pieceInUnsealedSector(ctx context.Context, n retrievalmarket.RetrievalProviderNode, pieceInfo piecestore.PieceInfo) bool { + for _, di := range pieceInfo.Deals { + isUnsealed, err := n.IsUnsealed(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded()) + if err != nil { + log.Errorf("failed to find out if sector %d is unsealed, err=%s", di.SectorID, err) + continue + } + if isUnsealed { + return true + } + } + + return false +} + +func storageDealsForPiece(clientSpecificPiece bool, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo, pieceStore piecestore.PieceStore) ([]abi.DealID, error) { + var storageDeals []abi.DealID + var err error + if clientSpecificPiece { + // If the user wants to retrieve the payload from a specific piece, + // we only need to inspect storage deals made for that piece to quote a price. + for _, d := range pieceInfo.Deals { + storageDeals = append(storageDeals, d.DealID) + } + } else { + // If the user does NOT want to retrieve from a specific piece, we'll have to inspect all storage deals + // made for that piece to quote a price. + storageDeals, err = getAllDealsContainingPayload(pieceStore, payloadCID) + if err != nil { + return nil, xerrors.Errorf("failed to fetch deals for payload: %w", err) + } + } + + if len(storageDeals) == 0 { + return nil, xerrors.New("no storage deals found") + } + + return storageDeals, nil +} + +func getAllDealsContainingPayload(pieceStore piecestore.PieceStore, payloadCID cid.Cid) ([]abi.DealID, error) { + cidInfo, err := pieceStore.GetCIDInfo(payloadCID) + if err != nil { + return nil, xerrors.Errorf("get cid info: %w", err) + } + var dealsIds []abi.DealID + var lastErr error + + for _, pieceBlockLocation := range cidInfo.PieceBlockLocations { + pieceInfo, err := pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID) + if err != nil { + lastErr = err + continue + } + for _, d := range pieceInfo.Deals { + dealsIds = append(dealsIds, d.DealID) + } + } + + if lastErr == nil && len(dealsIds) == 0 { + return nil, xerrors.New("no deals found") + } + + if lastErr != nil && len(dealsIds) == 0 { + return nil, xerrors.Errorf("failed to fetch deals containing payload %s: %w", payloadCID, lastErr) + } + + return dealsIds, nil +} + +func getPieceInfoFromCid(ctx context.Context, n retrievalmarket.RetrievalProviderNode, pieceStore piecestore.PieceStore, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, bool, error) { cidInfo, err := pieceStore.GetCIDInfo(payloadCID) if err != nil { - return piecestore.PieceInfoUndefined, xerrors.Errorf("get cid info: %w", err) + return piecestore.PieceInfoUndefined, false, xerrors.Errorf("get cid info: %w", err) } var lastErr error + var sealedPieceInfo *piecestore.PieceInfo + for _, pieceBlockLocation := range cidInfo.PieceBlockLocations { pieceInfo, err := pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID) - if err == nil { - if pieceCID.Equals(cid.Undef) || pieceInfo.PieceCID.Equals(pieceCID) { - return pieceInfo, nil + if err != nil { + lastErr = err + continue + } + + // if client wants to retrieve the payload from a specific piece, just return that piece. + if pieceCID.Defined() && pieceInfo.PieceCID.Equals(pieceCID) { + return pieceInfo, pieceInUnsealedSector(ctx, n, pieceInfo), nil + } + + // if client dosen't have a preference for a particular piece, prefer a piece + // for which an unsealed sector exists. + if pieceCID.Equals(cid.Undef) { + if pieceInUnsealedSector(ctx, n, pieceInfo) { + return pieceInfo, true, nil + } + + if sealedPieceInfo == nil { + sealedPieceInfo = &pieceInfo } } - lastErr = err + + } + + if sealedPieceInfo != nil { + return *sealedPieceInfo, false, nil } + if lastErr == nil { lastErr = xerrors.Errorf("unknown pieceCID %s", pieceCID.String()) } - return piecestore.PieceInfoUndefined, xerrors.Errorf("could not locate piece: %w", lastErr) + + return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr) } var _ dtutils.StoreGetter = &providerStoreGetter{} diff --git a/retrievalmarket/impl/provider_test.go b/retrievalmarket/impl/provider_test.go index 4ece7013..91883ca7 100644 --- a/retrievalmarket/impl/provider_test.go +++ b/retrievalmarket/impl/provider_test.go @@ -37,32 +37,626 @@ import ( tut "github.com/filecoin-project/go-fil-markets/shared_testutil" ) +func TestDynamicPricing(t *testing.T) { + ctx := context.Background() + expectedAddress := address.TestAddress2 + + payloadCID := tut.GenerateCids(1)[0] + peer1 := peer.ID("peer1") + peer2 := peer.ID("peer2") + + // differential price per byte + expectedppbUnVerified := abi.NewTokenAmount(4321) + expectedppbVerified := abi.NewTokenAmount(2) + + // differential sealing/unsealing price + expectedUnsealPrice := abi.NewTokenAmount(100) + expectedUnsealDiscount := abi.NewTokenAmount(1) + + // differential payment interval + expectedpiPeer1 := uint64(4567) + expectedpiPeer2 := uint64(20) + + expectedPaymentIntervalIncrease := uint64(100) + + // multiple pieces have the same payload + expectedPieceCID1 := tut.GenerateCids(1)[0] + expectedPieceCID2 := tut.GenerateCids(1)[0] + + // sizes + piece1SizePadded := uint64(1234) + piece1Size := uint64(abi.PaddedPieceSize(piece1SizePadded).Unpadded()) + + piece2SizePadded := uint64(2234) + piece2Size := uint64(abi.PaddedPieceSize(piece2SizePadded).Unpadded()) + + expectedCIDInfo := piecestore.CIDInfo{ + PieceBlockLocations: []piecestore.PieceBlockLocation{ + { + PieceCID: expectedPieceCID1, + }, + { + PieceCID: expectedPieceCID2, + }, + }, + } + + piece1 := piecestore.PieceInfo{ + PieceCID: expectedPieceCID1, + Deals: []piecestore.DealInfo{ + { + DealID: abi.DealID(1), + Length: abi.PaddedPieceSize(piece1SizePadded), + }, + { + DealID: abi.DealID(11), + Length: abi.PaddedPieceSize(piece1SizePadded), + }, + }, + } + + piece2 := piecestore.PieceInfo{ + PieceCID: expectedPieceCID2, + Deals: []piecestore.DealInfo{ + { + DealID: abi.DealID(2), + Length: abi.PaddedPieceSize(piece2SizePadded), + }, + { + DealID: abi.DealID(22), + Length: abi.PaddedPieceSize(piece2SizePadded), + }, + { + DealID: abi.DealID(222), + Length: abi.PaddedPieceSize(piece2SizePadded), + }, + }, + } + + dPriceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + + if dealPricingParams.VerifiedDeal { + ask.PricePerByte = expectedppbVerified + } else { + ask.PricePerByte = expectedppbUnVerified + } + + if dealPricingParams.Unsealed { + ask.UnsealPrice = expectedUnsealDiscount + } else { + ask.UnsealPrice = expectedUnsealPrice + } + + fmt.Println("\n client is", dealPricingParams.Client.String()) + if dealPricingParams.Client == peer2 { + ask.PaymentInterval = expectedpiPeer2 + } else { + ask.PaymentInterval = expectedpiPeer1 + } + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + + return ask, nil + } + + buildProvider := func(t *testing.T, node *testnodes.TestRetrievalProviderNode, qs network.RetrievalQueryStream, + pieceStore piecestore.PieceStore, net *tut.TestRetrievalMarketNetwork, pFnc retrievalimpl.RetrievalPricingFunc) retrievalmarket.RetrievalProvider { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + multiStore, err := multistore.NewMultiDstore(ds) + require.NoError(t, err) + dt := tut.NewTestDataTransfer() + + c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds, pFnc) + require.NoError(t, err) + tut.StartAndWaitForReady(ctx, t, c) + return c + } + + readWriteQueryStream := func() *tut.TestRetrievalQueryStream { + qRead, qWrite := tut.QueryReadWriter() + qrRead, qrWrite := tut.QueryResponseReadWriter() + qs := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{ + Reader: qRead, + Writer: qWrite, + RespReader: qrRead, + RespWriter: qrWrite, + }) + return qs + } + + tcs := map[string]struct { + query retrievalmarket.Query + expFunc func(t *testing.T, pieceStore *tut.TestPieceStore) + nodeFunc func(n *testnodes.TestRetrievalProviderNode) + peerIdFnc func(stream *tut.TestRetrievalQueryStream) + providerFnc func(provider retrievalmarket.RetrievalProvider) + + pricingFnc retrievalimpl.RetrievalPricingFunc + + expectedPricePerByte abi.TokenAmount + expectedPaymentInterval uint64 + expectedPaymentIntervalIncrease uint64 + expectedUnsealPrice abi.TokenAmount + expectedSize uint64 + }{ + // Retrieval request for a payloadCid without a pieceCid + "pieceCid no-op: quote correct price for sealed, unverified, peer1": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbUnVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "pieceCid no-op: quote correct price for sealed, unverified, peer2": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer2) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbUnVerified, + expectedPaymentInterval: expectedpiPeer2, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "pieceCid no-op: quote correct price for sealed, verified, peer1": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "pieceCid no-op: quote correct price for unsealed, unverified, peer1": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbUnVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealDiscount, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + + "pieceCid no-op: quote correct price for unsealed, verified, peer1": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealDiscount, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + + "pieceCid no-op: quote correct price for unsealed, verified, peer1 using default pricing policy if data transfer fee set to zero for verified deals": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + + providerFnc: func(provider retrievalmarket.RetrievalProvider) { + ask := provider.GetAsk() + ask.PaymentInterval = expectedpiPeer1 + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + provider.SetAsk(ask) + }, + + pricingFnc: retrievalimpl.DefaultPricingFunc(true), + + expectedPricePerByte: big.Zero(), + expectedUnsealPrice: big.Zero(), + expectedPaymentInterval: expectedpiPeer1, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + + "pieceCid no-op: quote correct price for unsealed, verified, peer1 using default pricing policy if data transfer fee not set to zero for verified deals": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + + providerFnc: func(provider retrievalmarket.RetrievalProvider) { + ask := provider.GetAsk() + ask.PricePerByte = expectedppbVerified + ask.PaymentInterval = expectedpiPeer1 + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + provider.SetAsk(ask) + }, + + pricingFnc: retrievalimpl.DefaultPricingFunc(false), + + expectedPricePerByte: expectedppbVerified, + expectedUnsealPrice: big.Zero(), + expectedPaymentInterval: expectedpiPeer1, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + + "pieceCid no-op: quote correct price for sealed, verified, peer1 using default pricing policy": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) { + ask := provider.GetAsk() + ask.PricePerByte = expectedppbVerified + ask.PaymentInterval = expectedpiPeer1 + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + ask.UnsealPrice = expectedUnsealPrice + provider.SetAsk(ask) + }, + pricingFnc: retrievalimpl.DefaultPricingFunc(false), + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + // Retrieval requests for a payloadCid inside a specific piece Cid + "specific sealed piece Cid, first piece Cid matches: quote correct price for sealed, unverified, peer1": { + query: retrievalmarket.Query{ + PayloadCID: payloadCID, + QueryParams: retrievalmarket.QueryParams{PieceCID: &expectedPieceCID1}, + }, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbUnVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "specific sealed piece Cid, second piece Cid matches: quote correct price for sealed, unverified, peer1": { + query: retrievalmarket.Query{ + PayloadCID: payloadCID, + QueryParams: retrievalmarket.QueryParams{PieceCID: &expectedPieceCID2}, + }, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece1.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID1, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbUnVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + + "specific sealed piece Cid, first piece Cid matches: quote correct price for sealed, verified, peer1": { + query: retrievalmarket.Query{ + PayloadCID: payloadCID, + QueryParams: retrievalmarket.QueryParams{PieceCID: &expectedPieceCID1}, + }, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) + n.MarkVerified() + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealPrice, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "specific sealed piece Cid, first piece Cid matches: quote correct price for unsealed, verified, peer1": { + query: retrievalmarket.Query{ + PayloadCID: payloadCID, + QueryParams: retrievalmarket.QueryParams{PieceCID: &expectedPieceCID1}, + }, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece1.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: expectedUnsealDiscount, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + + "specific sealed piece Cid, first piece Cid matches: quote correct price for unsealed, verified, peer2": { + query: retrievalmarket.Query{ + PayloadCID: payloadCID, + QueryParams: retrievalmarket.QueryParams{PieceCID: &expectedPieceCID2}, + }, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer2) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := piece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + n.MarkVerified() + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, + + expectedPricePerByte: expectedppbVerified, + expectedPaymentInterval: expectedpiPeer2, + expectedUnsealPrice: expectedUnsealDiscount, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece2Size, + }, + "pieceCid no-op: quote correct price for sealed, unverified, peer1 based on a pre-existing ask": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) { + ask := provider.GetAsk() + ask.PricePerByte = expectedppbUnVerified + ask.UnsealPrice = expectedUnsealPrice + provider.SetAsk(ask) + }, + pricingFnc: func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask, _ := dPriceFunc(ctx, dealPricingParams) + ppb := big.Add(ask.PricePerByte, dealPricingParams.CurrentAsk.PricePerByte) + unseal := big.Add(ask.UnsealPrice, dealPricingParams.CurrentAsk.UnsealPrice) + ask.PricePerByte = ppb + ask.UnsealPrice = unseal + return ask, nil + }, + + expectedPricePerByte: big.Mul(expectedppbUnVerified, big.NewInt(2)), + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: big.Mul(expectedUnsealPrice, big.NewInt(2)), + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, + } + + for name, tc := range tcs { + t.Run(name, func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() + qs := readWriteQueryStream() + tc.peerIdFnc(qs) + + err := qs.WriteQuery(tc.query) + require.NoError(t, err) + pieceStore := tut.NewTestPieceStore() + tc.nodeFunc(node) + tc.expFunc(t, pieceStore) + + net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) + p := buildProvider(t, node, qs, pieceStore, net, tc.pricingFnc) + tc.providerFnc(p) + net.ReceiveQueryStream(qs) + + actualResp, err := qs.ReadQueryResponse() + require.NoError(t, err) + pieceStore.VerifyExpectations(t) + node.VerifyExpectations(t) + + require.Equal(t, expectedAddress, actualResp.PaymentAddress) + require.Equal(t, tc.expectedPricePerByte, actualResp.MinPricePerByte) + require.Equal(t, tc.expectedUnsealPrice, actualResp.UnsealPrice) + require.Equal(t, tc.expectedPaymentInterval, actualResp.MaxPaymentInterval) + require.Equal(t, tc.expectedPaymentIntervalIncrease, actualResp.MaxPaymentIntervalIncrease) + require.Equal(t, tc.expectedSize, actualResp.Size) + }) + } +} + func TestHandleQueryStream(t *testing.T) { ctx := context.Background() payloadCID := tut.GenerateCids(1)[0] expectedPeer := peer.ID("somepeer") - expectedSize := uint64(1234) + paddedSize := uint64(1234) + expectedSize := uint64(abi.PaddedPieceSize(paddedSize).Unpadded()) + + paddedSize2 := uint64(2234) + expectedSize2 := uint64(abi.PaddedPieceSize(paddedSize2).Unpadded()) expectedPieceCID := tut.GenerateCids(1)[0] + expectedPieceCID2 := tut.GenerateCids(1)[0] + expectedCIDInfo := piecestore.CIDInfo{ PieceBlockLocations: []piecestore.PieceBlockLocation{ { PieceCID: expectedPieceCID, }, + { + PieceCID: expectedPieceCID2, + }, }, } expectedPiece := piecestore.PieceInfo{ + PieceCID: expectedPieceCID, Deals: []piecestore.DealInfo{ { - Length: abi.PaddedPieceSize(expectedSize), + Length: abi.PaddedPieceSize(paddedSize), + }, + }, + } + + expectedPiece2 := piecestore.PieceInfo{ + PieceCID: expectedPieceCID2, + Deals: []piecestore.DealInfo{ + { + Length: abi.PaddedPieceSize(paddedSize2), }, }, } + expectedAddress := address.TestAddress2 expectedPricePerByte := abi.NewTokenAmount(4321) expectedPaymentInterval := uint64(4567) expectedPaymentIntervalIncrease := uint64(100) + expectedUnsealPrice := abi.NewTokenAmount(100) + + // differential pricing + expectedUnsealDiscount := abi.NewTokenAmount(1) readWriteQueryStream := func() network.RetrievalQueryStream { qRead, qWrite := tut.QueryReadWriter() @@ -77,21 +671,29 @@ func TestHandleQueryStream(t *testing.T) { return qs } - receiveStreamOnProvider := func(t *testing.T, qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) { - node := testnodes.NewTestRetrievalProviderNode() + receiveStreamOnProvider := func(t *testing.T, node *testnodes.TestRetrievalProviderNode, qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) { ds := dss.MutexWrap(datastore.NewMapDatastore()) multiStore, err := multistore.NewMultiDstore(ds) require.NoError(t, err) dt := tut.NewTestDataTransfer() net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) - c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds) - require.NoError(t, err) - ask := c.GetAsk() - ask.PricePerByte = expectedPricePerByte - ask.PaymentInterval = expectedPaymentInterval - ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease - c.SetAsk(ask) + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + ask.PricePerByte = expectedPricePerByte + ask.PaymentInterval = expectedPaymentInterval + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + + if dealPricingParams.Unsealed { + ask.UnsealPrice = expectedUnsealDiscount + } else { + ask.UnsealPrice = expectedUnsealPrice + } + return ask, nil + } + + c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds, priceFunc) + require.NoError(t, err) tut.StartAndWaitForReady(ctx, t, c) @@ -99,16 +701,23 @@ func TestHandleQueryStream(t *testing.T) { } testCases := []struct { - name string - query retrievalmarket.Query - expResp retrievalmarket.QueryResponse - expErr string - expFunc func(t *testing.T, pieceStore *tut.TestPieceStore) + name string + query retrievalmarket.Query + expResp retrievalmarket.QueryResponse + expErr string + expFunc func(t *testing.T, pieceStore *tut.TestPieceStore) + nodeFunc func(n *testnodes.TestRetrievalProviderNode) + + expectedPricePerByte abi.TokenAmount + expectedPaymentInterval uint64 + expectedPaymentIntervalIncrease uint64 + expectedUnsealPrice abi.TokenAmount }{ {name: "When PieceCID is not provided and PayloadCID is found", expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) + pieceStore.ExpectPiece(expectedPieceCID2, expectedPiece2) }, query: retrievalmarket.Query{PayloadCID: payloadCID}, expResp: retrievalmarket.QueryResponse{ @@ -116,7 +725,34 @@ func TestHandleQueryStream(t *testing.T) { PieceCIDFound: retrievalmarket.QueryItemAvailable, Size: expectedSize, }, + expectedPricePerByte: expectedPricePerByte, + expectedPaymentInterval: expectedPaymentInterval, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedUnsealPrice: expectedUnsealPrice, + }, + + {name: "When PieceCID is not provided, prefer a piece for which an unsealed sector already exists and price it accordingly", + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + p := expectedPiece2.Deals[0] + n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) + pieceStore.ExpectPiece(expectedPieceCID2, expectedPiece2) + }, + query: retrievalmarket.Query{PayloadCID: payloadCID}, + expResp: retrievalmarket.QueryResponse{ + Status: retrievalmarket.QueryResponseAvailable, + PieceCIDFound: retrievalmarket.QueryItemAvailable, + Size: expectedSize2, + }, + expectedPricePerByte: expectedPricePerByte, + expectedPaymentInterval: expectedPaymentInterval, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedUnsealPrice: expectedUnsealDiscount, }, + {name: "When PieceCID is provided and both PieceCID and PayloadCID are found", expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { loadPieceCIDS(t, pieceStore, payloadCID, expectedPieceCID) @@ -130,12 +766,18 @@ func TestHandleQueryStream(t *testing.T) { PieceCIDFound: retrievalmarket.QueryItemAvailable, Size: expectedSize, }, + expectedPricePerByte: expectedPricePerByte, + expectedPaymentInterval: expectedPaymentInterval, + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedUnsealPrice: expectedUnsealPrice, }, {name: "When QueryParams has PieceCID and is missing", expFunc: func(t *testing.T, ps *tut.TestPieceStore) { loadPieceCIDS(t, ps, payloadCID, cid.Undef) ps.ExpectCID(payloadCID, expectedCIDInfo) ps.ExpectMissingPiece(expectedPieceCID) + ps.ExpectMissingPiece(expectedPieceCID2) + }, query: retrievalmarket.Query{ PayloadCID: payloadCID, @@ -145,6 +787,10 @@ func TestHandleQueryStream(t *testing.T) { Status: retrievalmarket.QueryResponseUnavailable, PieceCIDFound: retrievalmarket.QueryItemUnavailable, }, + expectedPricePerByte: big.Zero(), + expectedPaymentInterval: 0, + expectedPaymentIntervalIncrease: 0, + expectedUnsealPrice: big.Zero(), }, {name: "When CID info not found", expFunc: func(t *testing.T, ps *tut.TestPieceStore) { @@ -158,20 +804,26 @@ func TestHandleQueryStream(t *testing.T) { Status: retrievalmarket.QueryResponseUnavailable, PieceCIDFound: retrievalmarket.QueryItemUnavailable, }, + expectedPricePerByte: big.Zero(), + expectedPaymentInterval: 0, + expectedPaymentIntervalIncrease: 0, + expectedUnsealPrice: big.Zero(), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() qs := readWriteQueryStream() err := qs.WriteQuery(tc.query) require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - pieceStore.ExpectCID(payloadCID, expectedCIDInfo) - pieceStore.ExpectMissingPiece(expectedPieceCID) + if tc.nodeFunc != nil { + tc.nodeFunc(node) + } tc.expFunc(t, pieceStore) - receiveStreamOnProvider(t, qs, pieceStore) + receiveStreamOnProvider(t, node, qs, pieceStore) actualResp, err := qs.ReadQueryResponse() pieceStore.VerifyExpectations(t) @@ -182,15 +834,17 @@ func TestHandleQueryStream(t *testing.T) { } tc.expResp.PaymentAddress = expectedAddress - tc.expResp.MinPricePerByte = expectedPricePerByte - tc.expResp.MaxPaymentInterval = expectedPaymentInterval - tc.expResp.MaxPaymentIntervalIncrease = expectedPaymentIntervalIncrease - tc.expResp.UnsealPrice = big.Zero() + tc.expResp.MinPricePerByte = tc.expectedPricePerByte + tc.expResp.MaxPaymentInterval = tc.expectedPaymentInterval + tc.expResp.MaxPaymentIntervalIncrease = tc.expectedPaymentIntervalIncrease + tc.expResp.UnsealPrice = tc.expectedUnsealPrice assert.Equal(t, tc.expResp, actualResp) }) } t.Run("error reading piece", func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() + qs := readWriteQueryStream() err := qs.WriteQuery(retrievalmarket.Query{ PayloadCID: payloadCID, @@ -198,7 +852,7 @@ func TestHandleQueryStream(t *testing.T) { require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - receiveStreamOnProvider(t, qs, pieceStore) + receiveStreamOnProvider(t, node, qs, pieceStore) response, err := qs.ReadQueryResponse() require.NoError(t, err) @@ -207,10 +861,11 @@ func TestHandleQueryStream(t *testing.T) { }) t.Run("when ReadDealStatusRequest fails", func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() qs := readWriteQueryStream() pieceStore := tut.NewTestPieceStore() - receiveStreamOnProvider(t, qs, pieceStore) + receiveStreamOnProvider(t, node, qs, pieceStore) response, err := qs.ReadQueryResponse() require.NotNil(t, err) @@ -218,6 +873,7 @@ func TestHandleQueryStream(t *testing.T) { }) t.Run("when WriteDealStatusResponse fails", func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() qRead, qWrite := tut.QueryReadWriter() qs := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{ PeerID: expectedPeer, @@ -232,8 +888,9 @@ func TestHandleQueryStream(t *testing.T) { pieceStore := tut.NewTestPieceStore() pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) + pieceStore.ExpectPiece(expectedPieceCID2, expectedPiece2) - receiveStreamOnProvider(t, qs, pieceStore) + receiveStreamOnProvider(t, node, qs, pieceStore) pieceStore.VerifyExpectations(t) }) @@ -245,6 +902,12 @@ func TestProvider_Construct(t *testing.T) { multiStore, err := multistore.NewMultiDstore(ds) require.NoError(t, err) dt := tut.NewTestDataTransfer() + + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + return ask, nil + } + _, err = retrievalimpl.NewProvider( spect.NewIDAddr(t, 2344), testnodes.NewTestRetrievalProviderNode(), @@ -253,6 +916,7 @@ func TestProvider_Construct(t *testing.T) { multiStore, dt, ds, + priceFunc, ) require.NoError(t, err) require.Len(t, dt.Subscribers, 1) @@ -290,6 +954,12 @@ func TestProviderConfigOpts(t *testing.T) { ds := datastore.NewMapDatastore() multiStore, err := multistore.NewMultiDstore(ds) require.NoError(t, err) + + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + return ask, nil + } + p, err := retrievalimpl.NewProvider( spect.NewIDAddr(t, 2344), testnodes.NewTestRetrievalProviderNode(), @@ -297,7 +967,7 @@ func TestProviderConfigOpts(t *testing.T) { tut.NewTestPieceStore(), multiStore, tut.NewTestDataTransfer(), - ds, opt1, opt2, + ds, priceFunc, opt1, opt2, ) require.NoError(t, err) assert.NotNil(t, p) @@ -317,7 +987,7 @@ func TestProviderConfigOpts(t *testing.T) { tut.NewTestPieceStore(), multiStore, tut.NewTestDataTransfer(), - ds, ddOpt) + ds, priceFunc, ddOpt) require.NoError(t, err) require.NotNil(t, p) } @@ -464,6 +1134,12 @@ func TestProviderMigrations(t *testing.T) { require.NoError(t, err) err = providerDs.Put(datastore.NewKey("retrieval-ask"), askBuf.Bytes()) require.NoError(t, err) + + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + return ask, nil + } + retrievalProvider, err := retrievalimpl.NewProvider( spect.NewIDAddr(t, 2344), testnodes.NewTestRetrievalProviderNode(), @@ -472,6 +1148,7 @@ func TestProviderMigrations(t *testing.T) { multiStore, dt, providerDs, + priceFunc, ) require.NoError(t, err) tut.StartAndWaitForReady(ctx, t, retrievalProvider) @@ -518,12 +1195,4 @@ func TestProviderMigrations(t *testing.T) { } require.Equal(t, expectedDeal, deal) } - ask := retrievalProvider.GetAsk() - expectedAsk := &retrievalmarket.Ask{ - PricePerByte: oldAsk.PricePerByte, - UnsealPrice: oldAsk.UnsealPrice, - PaymentInterval: oldAsk.PaymentInterval, - PaymentIntervalIncrease: oldAsk.PaymentIntervalIncrease, - } - require.Equal(t, expectedAsk, ask) } diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index 493ab996..ba0c93f6 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -30,7 +30,23 @@ type ProviderDealEnvironment interface { } func firstSuccessfulUnseal(ctx context.Context, node rm.RetrievalProviderNode, pieceInfo piecestore.PieceInfo) (io.ReadCloser, error) { + // prefer an unsealed sector containing the piece if one exists + for _, deal := range pieceInfo.Deals { + isUnsealed, err := node.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err != nil { + continue + } + if isUnsealed { + // UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around. + reader, err := node.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err == nil { + return reader, nil + } + } + } + lastErr := xerrors.New("no sectors found to unseal from") + // if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal. for _, deal := range pieceInfo.Deals { reader, err := node.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) if err == nil { diff --git a/retrievalmarket/impl/providerstates/provider_states_test.go b/retrievalmarket/impl/providerstates/provider_states_test.go index fe9404b9..cafcfd11 100644 --- a/retrievalmarket/impl/providerstates/provider_states_test.go +++ b/retrievalmarket/impl/providerstates/provider_states_test.go @@ -47,11 +47,17 @@ func TestUnsealData(t *testing.T) { } pieceCid := testnet.GenerateCids(1)[0] + sectorID := abi.SectorNumber(rand.Uint64()) offset := abi.PaddedPieceSize(rand.Uint64()) length := abi.PaddedPieceSize(rand.Uint64()) + + sectorID2 := abi.SectorNumber(rand.Uint64()) + offset2 := abi.PaddedPieceSize(rand.Uint64()) + length2 := abi.PaddedPieceSize(rand.Uint64()) + data := testnet.RandomBytes(100) - makeDeal := func() *rm.ProviderDealState { + makeDeals := func() *rm.ProviderDealState { return &rm.ProviderDealState{ DealProposal: proposal, Status: rm.DealStatusUnsealing, @@ -64,6 +70,12 @@ func TestUnsealData(t *testing.T) { Offset: offset, Length: length, }, + { + DealID: abi.DealID(rand.Uint64()), + SectorID: sectorID2, + Offset: offset2, + Length: length2, + }, }, }, TotalSent: 0, @@ -71,28 +83,52 @@ func TestUnsealData(t *testing.T) { } } - t.Run("it works", func(t *testing.T) { + t.Run("prefers an already unsealed sector", func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() + node.MarkUnsealed(ctx, sectorID2, offset2.Unpadded(), length2.Unpadded()) + node.ExpectUnseal(sectorID2, offset2.Unpadded(), length2.Unpadded(), data) + + dealState := makeDeals() + setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) {} + runUnsealData(t, node, setupEnv, dealState) + require.Equal(t, dealState.Status, rm.DealStatusUnsealed) + }) + + t.Run("use a non-unsealed sector if there is no unsealed sector", func(t *testing.T) { node := testnodes.NewTestRetrievalProviderNode() node.ExpectUnseal(sectorID, offset.Unpadded(), length.Unpadded(), data) - dealState := makeDeal() + dealState := makeDeals() setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) {} runUnsealData(t, node, setupEnv, dealState) require.Equal(t, dealState.Status, rm.DealStatusUnsealed) }) - t.Run("unseal error", func(t *testing.T) { + t.Run("pick a sector for which unseal does NOT fail", func(t *testing.T) { node := testnodes.NewTestRetrievalProviderNode() node.ExpectFailedUnseal(sectorID, offset.Unpadded(), length.Unpadded()) - dealState := makeDeal() + node.ExpectUnseal(sectorID2, offset2.Unpadded(), length2.Unpadded(), data) + dealState := makeDeals() + setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) {} + runUnsealData(t, node, setupEnv, dealState) + require.Equal(t, dealState.Status, rm.DealStatusUnsealed) + }) + + t.Run("unseal error if all fail", func(t *testing.T) { + node := testnodes.NewTestRetrievalProviderNode() + node.ExpectFailedUnseal(sectorID, offset.Unpadded(), length.Unpadded()) + node.ExpectFailedUnseal(sectorID2, offset2.Unpadded(), length2.Unpadded()) + + dealState := makeDeals() setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) {} runUnsealData(t, node, setupEnv, dealState) require.Equal(t, dealState.Status, rm.DealStatusFailing) require.Equal(t, dealState.Message, "Could not unseal") }) + t.Run("ReadIntoBlockstore error", func(t *testing.T) { node := testnodes.NewTestRetrievalProviderNode() node.ExpectUnseal(sectorID, offset.Unpadded(), length.Unpadded(), data) - dealState := makeDeal() + dealState := makeDeals() setupEnv := func(fe *rmtesting.TestProviderDealEnvironment) { fe.ReadIntoBlockstoreError = errors.New("Something went wrong") } diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation.go b/retrievalmarket/impl/requestvalidation/requestvalidation.go index e42938d6..5852f4a9 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "time" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -23,6 +24,8 @@ import ( var allSelectorBytes []byte +var askTimeout = 5 * time.Second + func init() { buf := new(bytes.Buffer) _ = dagcbor.Encoder(shared.AllSelector(), buf) @@ -31,9 +34,11 @@ func init() { // ValidationEnvironment contains the dependencies needed to validate deals type ValidationEnvironment interface { - GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, error) + GetAsk(ctx context.Context, payloadCid cid.Cid, pieceCid *cid.Cid, piece piecestore.PieceInfo, isUnsealed bool, client peer.ID) (retrievalmarket.Ask, error) + + GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, bool, error) // CheckDealParams verifies the given deal params are acceptable - CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error + CheckDealParams(ask retrievalmarket.Ask, pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error // RunDealDecisioningLogic runs custom deal decision logic to decide if a deal is accepted, if present RunDealDecisioningLogic(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) // StateMachines returns the FSM Group to begin tracking with @@ -154,9 +159,25 @@ func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.I } func (rv *ProviderRequestValidator) acceptDeal(deal *retrievalmarket.ProviderDealState) (retrievalmarket.DealStatus, error) { + pieceInfo, isUnsealed, err := rv.env.GetPiece(deal.PayloadCID, deal.PieceCID) + if err != nil { + if err == retrievalmarket.ErrNotFound { + return retrievalmarket.DealStatusDealNotFound, err + } + return retrievalmarket.DealStatusErrored, err + } + + ctx, cancel := context.WithTimeout(context.TODO(), askTimeout) + defer cancel() + + ask, err := rv.env.GetAsk(ctx, deal.PayloadCID, deal.PieceCID, pieceInfo, isUnsealed, deal.Receiver) + if err != nil { + return retrievalmarket.DealStatusErrored, err + } + // check that the deal parameters match our required parameters or // reject outright - err := rv.env.CheckDealParams(deal.PricePerByte, deal.PaymentInterval, deal.PaymentIntervalIncrease, deal.UnsealPrice) + err = rv.env.CheckDealParams(ask, deal.PricePerByte, deal.PaymentInterval, deal.PaymentIntervalIncrease, deal.UnsealPrice) if err != nil { return retrievalmarket.DealStatusRejected, err } @@ -170,13 +191,6 @@ func (rv *ProviderRequestValidator) acceptDeal(deal *retrievalmarket.ProviderDea } // verify we have the piece - pieceInfo, err := rv.env.GetPiece(deal.PayloadCID, deal.PieceCID) - if err != nil { - if err == retrievalmarket.ErrNotFound { - return retrievalmarket.DealStatusDealNotFound, err - } - return retrievalmarket.DealStatusErrored, err - } deal.PieceInfo = &pieceInfo diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go index bf6ba612..d20a7e4c 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go @@ -223,6 +223,7 @@ func TestValidatePull(t *testing.T) { } type fakeValidationEnvironment struct { + IsUnsealedPiece bool PieceInfo piecestore.PieceInfo GetPieceErr error CheckDealParamsError error @@ -232,14 +233,21 @@ type fakeValidationEnvironment struct { BeginTrackingError error NextStoreIDValue multistore.StoreID NextStoreIDError error + + Ask retrievalmarket.Ask +} + +func (fve *fakeValidationEnvironment) GetAsk(ctx context.Context, payloadCid cid.Cid, pieceCid *cid.Cid, + piece piecestore.PieceInfo, isUnsealed bool, client peer.ID) (retrievalmarket.Ask, error) { + return fve.Ask, nil } -func (fve *fakeValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, error) { - return fve.PieceInfo, fve.GetPieceErr +func (fve *fakeValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, bool, error) { + return fve.PieceInfo, fve.IsUnsealedPiece, fve.GetPieceErr } // CheckDealParams verifies the given deal params are acceptable -func (fve *fakeValidationEnvironment) CheckDealParams(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error { +func (fve *fakeValidationEnvironment) CheckDealParams(ask retrievalmarket.Ask, pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64, unsealPrice abi.TokenAmount) error { return fve.CheckDealParamsError } diff --git a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go index 031bddea..2e343dff 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go @@ -11,6 +11,7 @@ import ( "sync" "testing" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -53,6 +54,15 @@ type TestRetrievalProviderNode struct { received map[sectorKey]struct{} lk sync.Mutex expectedVouchers map[expectedVoucherKey]voucherResult + + expectedPricingParamDeals []abi.DealID + receivedPricingParamDeals []abi.DealID + + expectedPricingPieceCID cid.Cid + receivedPricingPieceCID cid.Cid + + unsealed map[sectorKey]struct{} + isVerified bool receivedVouchers []abi.TokenAmount unsealPaused chan struct{} } @@ -66,9 +76,37 @@ func NewTestRetrievalProviderNode() *TestRetrievalProviderNode { expectations: make(map[sectorKey]struct{}), received: make(map[sectorKey]struct{}), expectedVouchers: make(map[expectedVoucherKey]voucherResult), + unsealed: make(map[sectorKey]struct{}), } } +func (trpn *TestRetrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + _, ok := trpn.unsealed[sectorKey{sectorID, offset, length}] + return ok, nil +} + +func (trpn *TestRetrievalProviderNode) MarkUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) { + trpn.unsealed[sectorKey{sectorID, offset, length}] = struct{}{} +} + +func (trpn *TestRetrievalProviderNode) MarkVerified() { + trpn.isVerified = true +} + +func (trpn *TestRetrievalProviderNode) ExpectPricingParams(pieceCID cid.Cid, deals []abi.DealID) { + trpn.expectedPricingPieceCID = pieceCID + trpn.expectedPricingParamDeals = deals +} + +func (trpn *TestRetrievalProviderNode) GetRetrievalPricingInput(_ context.Context, pieceCID cid.Cid, deals []abi.DealID) (retrievalmarket.PricingInput, error) { + trpn.receivedPricingParamDeals = deals + trpn.receivedPricingPieceCID = pieceCID + + return retrievalmarket.PricingInput{ + VerifiedDeal: trpn.isVerified, + }, nil +} + // StubUnseal stubs a response to attempting to unseal a sector with the given paramters func (trpn *TestRetrievalProviderNode) StubUnseal(sectorID abi.SectorNumber, offset, length abi.UnpaddedPieceSize, data []byte) { trpn.sectorStubs[sectorKey{sectorID, offset, length}] = data @@ -125,6 +163,9 @@ func (trpn *TestRetrievalProviderNode) UnsealSector(ctx context.Context, sectorI func (trpn *TestRetrievalProviderNode) VerifyExpectations(t *testing.T) { require.Equal(t, len(trpn.expectedVouchers), len(trpn.receivedVouchers)) require.Equal(t, trpn.expectations, trpn.received) + require.Equal(t, trpn.expectedPricingPieceCID, trpn.receivedPricingPieceCID) + + require.Equal(t, trpn.expectedPricingParamDeals, trpn.receivedPricingParamDeals) } // SavePaymentVoucher simulates saving a payment voucher with a stubbed result diff --git a/retrievalmarket/network/network.go b/retrievalmarket/network/network.go index c1c1bebe..404dea89 100644 --- a/retrievalmarket/network/network.go +++ b/retrievalmarket/network/network.go @@ -18,6 +18,7 @@ type RetrievalQueryStream interface { ReadQueryResponse() (retrievalmarket.QueryResponse, error) WriteQueryResponse(retrievalmarket.QueryResponse) error Close() error + RemotePeer() peer.ID } // RetrievalReceiver is the API for handling data coming in on diff --git a/retrievalmarket/network/old_query_stream.go b/retrievalmarket/network/old_query_stream.go index fd146ac6..940ce815 100644 --- a/retrievalmarket/network/old_query_stream.go +++ b/retrievalmarket/network/old_query_stream.go @@ -20,6 +20,10 @@ type oldQueryStream struct { var _ RetrievalQueryStream = (*oldQueryStream)(nil) +func (qs *oldQueryStream) RemotePeer() peer.ID { + return qs.p +} + func (qs *oldQueryStream) ReadQuery() (retrievalmarket.Query, error) { var q migrations.Query0 diff --git a/retrievalmarket/network/query_stream.go b/retrievalmarket/network/query_stream.go index 520ea99f..37c18a28 100644 --- a/retrievalmarket/network/query_stream.go +++ b/retrievalmarket/network/query_stream.go @@ -31,6 +31,10 @@ func (qs *queryStream) ReadQuery() (retrievalmarket.Query, error) { return q, nil } +func (qs *queryStream) RemotePeer() peer.ID { + return qs.p +} + func (qs *queryStream) WriteQuery(q retrievalmarket.Query) error { return cborutil.WriteCborRPC(qs.rw, &q) } diff --git a/retrievalmarket/nodes.go b/retrievalmarket/nodes.go index 7ef2cb0d..8eb27e0d 100644 --- a/retrievalmarket/nodes.go +++ b/retrievalmarket/nodes.go @@ -52,4 +52,8 @@ type RetrievalProviderNode interface { GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) + + IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) + + GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (PricingInput, error) } diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index 48cea997..18c24f56 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -420,9 +420,6 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.Stor pieceStore.ExpectCID(payloadCID, cidInfo) pieceStore.ExpectPiece(expectedPiece, pieceInfo) providerDs := namespace.Wrap(sh.TestData.Ds2, datastore.NewKey("/retrievals/provider")) - provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, pieceStore, sh.TestData.MultiStore2, sh.DTProvider, providerDs) - require.NoError(t, err) - tut.StartAndWaitForReady(ctx, t, provider) var p retrievalmarket.Params if len(params) == 0 { @@ -436,12 +433,20 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.Stor p = params[0] } - ask := provider.GetAsk() - ask.PaymentInterval = p.PaymentInterval - ask.PaymentIntervalIncrease = p.PaymentIntervalIncrease - ask.PricePerByte = p.PricePerByte - ask.UnsealPrice = p.UnsealPrice - provider.SetAsk(ask) + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} + ask.PaymentInterval = p.PaymentInterval + ask.PaymentIntervalIncrease = p.PaymentIntervalIncrease + ask.PricePerByte = p.PricePerByte + ask.UnsealPrice = p.UnsealPrice + + return ask, nil + } + + provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, pieceStore, sh.TestData.MultiStore2, sh.DTProvider, providerDs, + priceFunc) + require.NoError(t, err) + tut.StartAndWaitForReady(ctx, t, provider) return &retrievalHarness{ Ctx: ctx, diff --git a/retrievalmarket/types.go b/retrievalmarket/types.go index 9b113067..5da7138b 100644 --- a/retrievalmarket/types.go +++ b/retrievalmarket/types.go @@ -416,3 +416,21 @@ type ChannelAvailableFunds struct { // and in the local datastore VoucherReedeemedAmt abi.TokenAmount } + +// PricingInput provides input parameters required to price a retrieval deal. +type PricingInput struct { + // PayloadCID is the cid of the payload to retrieve. + PayloadCID cid.Cid + // PieceCID is the cid of the Piece from which the Payload will be retrieved. + PieceCID cid.Cid + // PieceSize is the size of the Piece from which the payload will be retrieved. + PieceSize abi.UnpaddedPieceSize + // Client is the peerID of the retrieval client. + Client peer.ID + // VerifiedDeal is true if there exists a verified storage deal for the PayloadCID. + VerifiedDeal bool + // Unsealed is true if there exists an unsealed sector from which we can retrieve the given payload. + Unsealed bool + // CurrentAsk is the current configured ask in the ask-store. + CurrentAsk Ask +} diff --git a/retrievalmarket/types_test.go b/retrievalmarket/types_test.go index 774e7f71..1af72815 100644 --- a/retrievalmarket/types_test.go +++ b/retrievalmarket/types_test.go @@ -2,10 +2,12 @@ package retrievalmarket_test import ( "bytes" + "encoding/json" "testing" "github.com/ipld/go-ipld-prime/codec/dagcbor" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/libp2p/go-libp2p-core/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,6 +43,33 @@ func TestParamsMarshalUnmarshal(t *testing.T) { assert.Equal(t, sel, allSelector) } +func TestPricingInputMarshalUnmarshalJSON(t *testing.T) { + pid := test.RandPeerIDFatal(t) + + in := retrievalmarket.PricingInput{ + PayloadCID: tut.GenerateCids(1)[0], + PieceCID: tut.GenerateCids(1)[0], + PieceSize: abi.UnpaddedPieceSize(100), + Client: pid, + VerifiedDeal: true, + Unsealed: true, + CurrentAsk: retrievalmarket.Ask{ + PricePerByte: big.Zero(), + UnsealPrice: big.Zero(), + PaymentInterval: 0, + PaymentIntervalIncrease: 0, + }, + } + + bz, err := json.Marshal(in) + require.NoError(t, err) + + resp2 := retrievalmarket.PricingInput{} + require.NoError(t, json.Unmarshal(bz, &resp2)) + + require.Equal(t, in, resp2) +} + func TestParamsIntervalBounds(t *testing.T) { testCases := []struct { name string diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index b77687a4..89d0de03 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -51,7 +51,7 @@ type TestQueryStreamParams struct { // NewTestRetrievalQueryStream returns a new TestRetrievalQueryStream with the // behavior specified by the paramaters, or default behaviors if not specified. -func NewTestRetrievalQueryStream(params TestQueryStreamParams) rmnet.RetrievalQueryStream { +func NewTestRetrievalQueryStream(params TestQueryStreamParams) *TestRetrievalQueryStream { stream := TestRetrievalQueryStream{ p: params.PeerID, reader: TrivialQueryReader, @@ -74,6 +74,14 @@ func NewTestRetrievalQueryStream(params TestQueryStreamParams) rmnet.RetrievalQu return &stream } +func (trqs *TestRetrievalQueryStream) SetRemotePeer(rp peer.ID) { + trqs.p = rp +} + +func (trqs *TestRetrievalQueryStream) RemotePeer() peer.ID { + return trqs.p +} + // ReadDealStatusRequest calls the mocked query reader. func (trqs *TestRetrievalQueryStream) ReadQuery() (rm.Query, error) { return trqs.reader()