diff --git a/retrievalmarket/impl/clientstates/client_fsm.go b/retrievalmarket/impl/clientstates/client_fsm.go index 11e9e3b9..7eeac92b 100644 --- a/retrievalmarket/impl/clientstates/client_fsm.go +++ b/retrievalmarket/impl/clientstates/client_fsm.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -28,19 +29,29 @@ var ClientEvents = fsm.Events{ fsm.Event(rm.ClientEventOpen). From(rm.DealStatusNew).ToNoChange(), fsm.Event(rm.ClientEventPaymentChannelErrored). - From(rm.DealStatusAccepted).To(rm.DealStatusFailed). + FromMany(rm.DealStatusAccepted, rm.DealStatusPaymentChannelCreating).To(rm.DealStatusFailed). Action(func(deal *rm.ClientDealState, err error) error { - deal.Message = xerrors.Errorf("getting payment channel: %w", err).Error() + deal.Message = xerrors.Errorf("get or create payment channel: %w", err).Error() return nil }), - fsm.Event(rm.ClientEventAllocateLaneErrored). - From(rm.DealStatusAccepted).To(rm.DealStatusFailed). - Action(func(deal *rm.ClientDealState, err error) error { - deal.Message = xerrors.Errorf("allocating payment lane: %w", err).Error() + fsm.Event(rm.ClientEventPaymentChannelCreateInitiated). + From(rm.DealStatusAccepted).To(rm.DealStatusPaymentChannelCreating). + Action(func(deal *rm.ClientDealState, msgCID cid.Cid) error { + deal.WaitMsgCID = &msgCID + return nil + }), + fsm.Event(rm.ClientEventPaymentChannelAddingFunds). + FromMany(rm.DealStatusAccepted).To(rm.DealStatusPaymentChannelAddingFunds). + Action(func(deal *rm.ClientDealState, msgCID cid.Cid, payCh address.Address) error { + deal.WaitMsgCID = &msgCID + deal.PaymentInfo = &rm.PaymentInfo{ + PayCh: payCh, + } return nil }), - fsm.Event(rm.ClientEventPaymentChannelCreated). - From(rm.DealStatusAccepted).To(rm.DealStatusPaymentChannelCreated). + fsm.Event(rm.ClientEventPaymentChannelReady). + FromMany(rm.DealStatusPaymentChannelCreating, rm.DealStatusPaymentChannelAddingFunds). + To(rm.DealStatusPaymentChannelReady). Action(func(deal *rm.ClientDealState, payCh address.Address, lane uint64) error { deal.PaymentInfo = &rm.PaymentInfo{ PayCh: payCh, @@ -48,6 +59,18 @@ var ClientEvents = fsm.Events{ } return nil }), + fsm.Event(rm.ClientEventAllocateLaneErrored). + FromMany(rm.DealStatusPaymentChannelCreating, rm.DealStatusPaymentChannelAddingFunds).To(rm.DealStatusFailed). + Action(func(deal *rm.ClientDealState, err error) error { + deal.Message = xerrors.Errorf("allocating payment lane: %w", err).Error() + return nil + }), + fsm.Event(rm.ClientEventPaymentChannelAddFundsErrored). + From(rm.DealStatusPaymentChannelAddingFunds).To(rm.DealStatusFailed). + Action(func(deal *rm.ClientDealState, err error) error { + deal.Message = xerrors.Errorf("wait for add funds: %w", err).Error() + return nil + }), fsm.Event(rm.ClientEventWriteDealProposalErrored). FromAny().To(rm.DealStatusErrored). Action(func(deal *rm.ClientDealState, err error) error { @@ -123,50 +146,52 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(rm.ClientEventConsumeBlockFailed). - FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFailed). + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFailed). Action(func(deal *rm.ClientDealState, err error) error { deal.Message = xerrors.Errorf("consuming block: %w", err).Error() return nil }), fsm.Event(rm.ClientEventLastPaymentRequested). - FromMany(rm.DealStatusPaymentChannelCreated, + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing, rm.DealStatusBlocksComplete).To(rm.DealStatusFundsNeededLastPayment). Action(recordPaymentOwed), fsm.Event(rm.ClientEventAllBlocksReceived). - FromMany(rm.DealStatusPaymentChannelCreated, + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing, rm.DealStatusBlocksComplete).To(rm.DealStatusBlocksComplete). Action(recordProcessed), fsm.Event(rm.ClientEventComplete). - FromMany(rm.DealStatusPaymentChannelCreated, + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing, rm.DealStatusBlocksComplete, rm.DealStatusFinalizing).To(rm.DealStatusCompleted). Action(recordProcessed), fsm.Event(rm.ClientEventEarlyTermination). - FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFailed). + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFailed). Action(func(deal *rm.ClientDealState) error { deal.Message = "received complete status before all blocks received" return nil }), fsm.Event(rm.ClientEventPaymentRequested). - FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFundsNeeded). + FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFundsNeeded). Action(recordPaymentOwed), fsm.Event(rm.ClientEventBlocksReceived). - From(rm.DealStatusPaymentChannelCreated).To(rm.DealStatusOngoing). + From(rm.DealStatusPaymentChannelReady).To(rm.DealStatusOngoing). From(rm.DealStatusOngoing).ToNoChange(). Action(recordProcessed), } // ClientStateEntryFuncs are the handlers for different states in a retrieval client var ClientStateEntryFuncs = fsm.StateEntryFuncs{ - rm.DealStatusNew: ProposeDeal, - rm.DealStatusAccepted: SetupPaymentChannel, - rm.DealStatusPaymentChannelCreated: ProcessNextResponse, - rm.DealStatusOngoing: ProcessNextResponse, - rm.DealStatusBlocksComplete: ProcessNextResponse, - rm.DealStatusFundsNeeded: ProcessPaymentRequested, - rm.DealStatusFundsNeededLastPayment: ProcessPaymentRequested, - rm.DealStatusFinalizing: Finalize, + rm.DealStatusNew: ProposeDeal, + rm.DealStatusAccepted: SetupPaymentChannelStart, + rm.DealStatusPaymentChannelCreating: WaitForPaymentChannelCreate, + rm.DealStatusPaymentChannelAddingFunds: WaitForPaymentChannelAddFunds, + rm.DealStatusPaymentChannelReady: ProcessNextResponse, + rm.DealStatusOngoing: ProcessNextResponse, + rm.DealStatusBlocksComplete: ProcessNextResponse, + rm.DealStatusFundsNeeded: ProcessPaymentRequested, + rm.DealStatusFundsNeededLastPayment: ProcessPaymentRequested, + rm.DealStatusFinalizing: Finalize, } diff --git a/retrievalmarket/impl/clientstates/client_states.go b/retrievalmarket/impl/clientstates/client_states.go index 1b46f356..e05fbe56 100644 --- a/retrievalmarket/impl/clientstates/client_states.go +++ b/retrievalmarket/impl/clientstates/client_states.go @@ -3,6 +3,7 @@ package clientstates import ( "context" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -18,22 +19,52 @@ type ClientDealEnvironment interface { ConsumeBlock(context.Context, rm.DealID, rm.Block) (uint64, bool, error) } -// SetupPaymentChannel sets up a payment channel for a deal -func SetupPaymentChannel(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { +// SetupPaymentChannelStart initiates setting up a payment channel for a deal +func SetupPaymentChannelStart(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { tok, _, err := environment.Node().GetChainHead(ctx.Context()) if err != nil { return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err) } - paych, err := environment.Node().GetOrCreatePaymentChannel(ctx.Context(), deal.ClientWallet, deal.MinerWallet, deal.TotalFunds, tok) + paych, msgCID, err := environment.Node().GetOrCreatePaymentChannel(ctx.Context(), deal.ClientWallet, deal.MinerWallet, deal.TotalFunds, tok) if err != nil { return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err) } + + if paych == address.Undef { + return ctx.Trigger(rm.ClientEventPaymentChannelCreateInitiated, msgCID) + } + + return ctx.Trigger(rm.ClientEventPaymentChannelAddingFunds, msgCID, paych) +} + +// WaitForPaymentChannelCreate waits for payment channel creation to be posted on chain, +// allocates a lane for vouchers, then signals that the payment channel is ready +func WaitForPaymentChannelCreate(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { + paych, err := environment.Node().WaitForPaymentChannelCreation(*deal.WaitMsgCID) + if err != nil { + return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err) + } + lane, err := environment.Node().AllocateLane(paych) if err != nil { return ctx.Trigger(rm.ClientEventAllocateLaneErrored, err) } - return ctx.Trigger(rm.ClientEventPaymentChannelCreated, paych, lane) + return ctx.Trigger(rm.ClientEventPaymentChannelReady, paych, lane) +} + +// WaitForPaymentChannelAddFunds waits for funds to be added to an existing payment channel, then +// signals that payment channel is ready again +func WaitForPaymentChannelAddFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { + err := environment.Node().WaitForPaymentChannelAddFunds(*deal.WaitMsgCID) + if err != nil { + return ctx.Trigger(rm.ClientEventPaymentChannelAddFundsErrored, err) + } + lane, err := environment.Node().AllocateLane(deal.PaymentInfo.PayCh) + if err != nil { + return ctx.Trigger(rm.ClientEventAllocateLaneErrored, err) + } + return ctx.Trigger(rm.ClientEventPaymentChannelReady, deal.PaymentInfo.PayCh, lane) } // ProposeDeal sends the proposal to the other party diff --git a/retrievalmarket/impl/clientstates/client_states_test.go b/retrievalmarket/impl/clientstates/client_states_test.go index ec7578f8..6e99160d 100644 --- a/retrievalmarket/impl/clientstates/client_states_test.go +++ b/retrievalmarket/impl/clientstates/client_states_test.go @@ -14,10 +14,11 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/ipfs/go-cid" mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - clientstates "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" testnet "github.com/filecoin-project/go-fil-markets/shared_testutil" @@ -57,7 +58,6 @@ func TestSetupPaymentChannel(t *testing.T) { ctx := context.Background() ds := testnet.NewTestRetrievalDealStream(testnet.TestDealStreamParams{}) expectedPayCh := address.TestAddress2 - expectedLane := uint64(10) eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents) require.NoError(t, err) runSetupPaymentChannel := func(t *testing.T, @@ -66,22 +66,33 @@ func TestSetupPaymentChannel(t *testing.T) { node := testnodes.NewTestRetrievalClientNode(params) environment := &fakeEnvironment{node, ds, 0, nil} fsmCtx := fsmtest.NewTestContext(ctx, eventMachine) - err := clientstates.SetupPaymentChannel(fsmCtx, environment, *dealState) + err := clientstates.SetupPaymentChannelStart(fsmCtx, environment, *dealState) require.NoError(t, err) fsmCtx.ReplayEvents(t, dealState) } - t.Run("it works", func(t *testing.T) { + t.Run("payment channel create initiated", func(t *testing.T) { + envParams := testnodes.TestRetrievalClientNodeParams{ + PayCh: address.Undef, + CreatePaychCID: testnet.GenerateCids(1)[0], + } dealState := makeDealState(retrievalmarket.DealStatusAccepted) + runSetupPaymentChannel(t, envParams, dealState) + assert.Empty(t, dealState.Message) + assert.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelCreating) + }) + + t.Run("payment channel needs funds added", func(t *testing.T) { envParams := testnodes.TestRetrievalClientNodeParams{ - PayCh: expectedPayCh, - Lane: expectedLane, + AddFundsOnly: true, + PayCh: expectedPayCh, + CreatePaychCID: testnet.GenerateCids(1)[0], } + dealState := makeDealState(retrievalmarket.DealStatusAccepted) runSetupPaymentChannel(t, envParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelCreated) - require.Equal(t, dealState.PaymentInfo.PayCh, expectedPayCh) - require.Equal(t, dealState.PaymentInfo.Lane, expectedLane) + require.Equal(t, retrievalmarket.DealStatusPaymentChannelAddingFunds, dealState.Status) + require.Equal(t, expectedPayCh, dealState.PaymentInfo.PayCh) }) t.Run("when create payment channel fails", func(t *testing.T) { @@ -89,26 +100,142 @@ func TestSetupPaymentChannel(t *testing.T) { envParams := testnodes.TestRetrievalClientNodeParams{ PayCh: address.Undef, PayChErr: errors.New("Something went wrong"), - Lane: expectedLane, } runSetupPaymentChannel(t, envParams, dealState) require.NotEmpty(t, dealState.Message) require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed) }) - t.Run("when allocate lane fails", func(t *testing.T) { - dealState := makeDealState(retrievalmarket.DealStatusAccepted) - envParams := testnodes.TestRetrievalClientNodeParams{ - PayCh: expectedPayCh, - Lane: expectedLane, - LaneError: errors.New("Something went wrong"), +} + +func TestWaitForPaymentChannelCreate(t *testing.T) { + ctx := context.Background() + ds := testnet.NewTestRetrievalDealStream(testnet.TestDealStreamParams{}) + expectedPayCh := address.TestAddress2 + expectedLane := uint64(10) + eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents) + require.NoError(t, err) + runWaitForPaychCreate := func(t *testing.T, + params testnodes.TestRetrievalClientNodeParams, + dealState *retrievalmarket.ClientDealState) { + node := testnodes.NewTestRetrievalClientNode(params) + environment := &fakeEnvironment{node, ds, 0, nil} + fsmCtx := fsmtest.NewTestContext(ctx, eventMachine) + err := clientstates.WaitForPaymentChannelCreate(fsmCtx, environment, *dealState) + require.NoError(t, err) + fsmCtx.ReplayEvents(t, dealState) + } + msgCID := testnet.GenerateCids(1)[0] + + t.Run("it works", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating) + dealState.WaitMsgCID = &msgCID + params := testnodes.TestRetrievalClientNodeParams{ + PayCh: expectedPayCh, + CreatePaychCID: msgCID, + Lane: expectedLane, } - runSetupPaymentChannel(t, envParams, dealState) - require.NotEmpty(t, dealState.Message) + runWaitForPaychCreate(t, params, dealState) + require.Empty(t, dealState.Message) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelReady) + require.Equal(t, expectedLane, dealState.PaymentInfo.Lane) + require.Equal(t, expectedPayCh, dealState.PaymentInfo.PayCh) + }) + t.Run("if Wait fails", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating) + dealState.WaitMsgCID = &msgCID + params := testnodes.TestRetrievalClientNodeParams{ + PayCh: expectedPayCh, + CreatePaychCID: msgCID, + WaitForChCreateErr: errors.New("boom"), + } + runWaitForPaychCreate(t, params, dealState) + require.Contains(t, dealState.Message, "boom") + require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed) + }) + + t.Run("if AllocateLane fails", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating) + dealState.WaitMsgCID = &msgCID + params := testnodes.TestRetrievalClientNodeParams{ + PayCh: expectedPayCh, + CreatePaychCID: msgCID, + LaneError: errors.New("boom"), + } + runWaitForPaychCreate(t, params, dealState) + require.Contains(t, dealState.Message, "boom") require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed) }) } +func TestWaitForPaymentChannelAddFunds(t *testing.T) { + ctx := context.Background() + ds := testnet.NewTestRetrievalDealStream(testnet.TestDealStreamParams{}) + expectedPayCh := address.TestAddress2 + expectedLane := uint64(99) + eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents) + require.NoError(t, err) + runWaitForPaychAddFunds := func(t *testing.T, + params testnodes.TestRetrievalClientNodeParams, + dealState *retrievalmarket.ClientDealState) { + node := testnodes.NewTestRetrievalClientNode(params) + environment := &fakeEnvironment{node, ds, 0, nil} + fsmCtx := fsmtest.NewTestContext(ctx, eventMachine) + err := clientstates.WaitForPaymentChannelAddFunds(fsmCtx, environment, *dealState) + require.NoError(t, err) + fsmCtx.ReplayEvents(t, dealState) + } + msgCID := testnet.GenerateCids(1)[0] + + t.Run("it works", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelAddingFunds) + dealState.PaymentInfo.PayCh = expectedPayCh + dealState.WaitMsgCID = &msgCID + + params := testnodes.TestRetrievalClientNodeParams{ + AddFundsOnly: true, + PayCh: expectedPayCh, + AddFundsCID: msgCID, + Lane: expectedLane, + } + runWaitForPaychAddFunds(t, params, dealState) + require.Empty(t, dealState.Message) + assert.Equal(t, retrievalmarket.DealStatusPaymentChannelReady, dealState.Status) + assert.Equal(t, expectedLane, dealState.PaymentInfo.Lane) + assert.Equal(t, expectedPayCh, dealState.PaymentInfo.PayCh) + }) + t.Run("if Wait fails", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelAddingFunds) + dealState.WaitMsgCID = &msgCID + params := testnodes.TestRetrievalClientNodeParams{ + AddFundsOnly: true, + PayCh: expectedPayCh, + AddFundsCID: msgCID, + WaitForAddFundsErr: errors.New("boom"), + Lane: expectedLane, + } + runWaitForPaychAddFunds(t, params, dealState) + assert.Contains(t, dealState.Message, "boom") + assert.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed) + assert.Equal(t, uint64(0), dealState.PaymentInfo.Lane) + }) + t.Run("if AllocateLane fails", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelAddingFunds) + dealState.WaitMsgCID = &msgCID + params := testnodes.TestRetrievalClientNodeParams{ + AddFundsOnly: true, + PayCh: expectedPayCh, + AddFundsCID: msgCID, + LaneError: errors.New("boom"), + Lane: expectedLane, + } + runWaitForPaychAddFunds(t, params, dealState) + assert.Contains(t, dealState.Message, "boom") + assert.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed) + assert.Equal(t, uint64(0), dealState.PaymentInfo.Lane) + }) +} + func TestProposeDeal(t *testing.T) { ctx := context.Background() node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}) @@ -487,13 +614,10 @@ var defaultPaymentRequested = abi.NewTokenAmount(500000) func makeDealState(status retrievalmarket.DealStatus) *retrievalmarket.ClientDealState { return &retrievalmarket.ClientDealState{ - TotalFunds: defaultTotalFunds, - MinerWallet: address.TestAddress, - ClientWallet: address.TestAddress2, - PaymentInfo: &retrievalmarket.PaymentInfo{ - PayCh: address.TestAddress2, - Lane: uint64(10), - }, + TotalFunds: defaultTotalFunds, + MinerWallet: address.TestAddress, + ClientWallet: address.TestAddress2, + PaymentInfo: &retrievalmarket.PaymentInfo{}, Status: status, BytesPaidFor: defaultBytesPaidFor, TotalReceived: defaultTotalReceived, diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index f903b599..95fc5d6f 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -79,7 +79,12 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC retrievalmarket.RetrievalProvider) { testData := tut.NewLibp2pTestData(bgCtx, t) nw1 := rmnet.NewFromLibp2pHost(testData.Host1) - rcNode1 := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{PayCh: payChAddr}) + cids := tut.GenerateCids(2) + rcNode1 := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{ + PayCh: payChAddr, + CreatePaychCID: cids[0], + AddFundsCID: cids[1], + }) client, err := retrievalimpl.NewClient(nw1, testData.Bs1, rcNode1, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1) require.NoError(t, err) nw2 := rmnet.NewFromLibp2pHost(testData.Host2) @@ -140,18 +145,23 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { }).Node() testCases := []struct { - name string - filename string - filesize uint64 - voucherAmts []abi.TokenAmount - selector ipld.Node - paramsV1, unsealing bool + name string + filename string + filesize uint64 + voucherAmts []abi.TokenAmount + selector ipld.Node + paramsV1, unsealing, addFunds bool }{ {name: "1 block file retrieval succeeds", filename: "lorem_under_1_block.txt", filesize: 410, voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(410000)}, unsealing: false}, + {name: "1 block file retrieval succeeds with existing payment channel", + filename: "lorem_under_1_block.txt", + filesize: 410, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(410000)}, + unsealing: false, addFunds: true}, {name: "1 block file retrieval succeeds with unsealing", filename: "lorem_under_1_block.txt", filesize: 410, @@ -267,7 +277,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { // ------- SET UP CLIENT nw1 := rmnet.NewFromLibp2pHost(testData.Host1) - createdChan, newLaneAddr, createdVoucher, client, err := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData) + createdChan, newLaneAddr, createdVoucher, client, err := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds) require.NoError(t, err) clientDealStateChan := make(chan retrievalmarket.ClientDealState) @@ -371,10 +381,14 @@ func setupClient( clientPaymentChannel address.Address, expectedVoucher *paych.SignedVoucher, nw1 rmnet.RetrievalMarketNetwork, - testData *tut.Libp2pTestData) (*pmtChan, + testData *tut.Libp2pTestData, + addFunds bool, +) ( + *pmtChan, *address.Address, *paych.SignedVoucher, - retrievalmarket.RetrievalClient, error) { + retrievalmarket.RetrievalClient, + error) { var createdChan pmtChan paymentChannelRecorder := func(client, miner address.Address, amt abi.TokenAmount) { createdChan = pmtChan{client, miner, amt} @@ -389,13 +403,17 @@ func setupClient( paymentVoucherRecorder := func(v *paych.SignedVoucher) { createdVoucher = *v } + cids := tut.GenerateCids(2) clientNode := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{ + AddFundsOnly: addFunds, PayCh: clientPaymentChannel, Lane: expectedVoucher.Lane, Voucher: expectedVoucher, PaymentChannelRecorder: paymentChannelRecorder, AllocateLaneRecorder: laneRecorder, PaymentVoucherRecorder: paymentVoucherRecorder, + CreatePaychCID: cids[0], + AddFundsCID: cids[1], }) client, err := retrievalimpl.NewClient(nw1, testData.Bs1, clientNode, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1) return &createdChan, &newLaneAddr, &createdVoucher, client, err diff --git a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go index 675284fb..34f02ace 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go @@ -2,10 +2,12 @@ package testnodes import ( "context" + "fmt" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/paych" + "github.com/ipfs/go-cid" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" @@ -14,12 +16,14 @@ import ( // TestRetrievalClientNode is a node adapter for a retrieval client whose responses // are stubbed type TestRetrievalClientNode struct { - payCh address.Address - payChErr error - lane uint64 - laneError error - voucher *paych.SignedVoucher - voucherError error + addFundsOnly bool // set this to true to test adding funds to an existing payment channel + payCh address.Address + payChErr error + createPaychMsgCID, addFundsMsgCID cid.Cid + lane uint64 + laneError error + voucher *paych.SignedVoucher + voucherError, waitCreateErr, waitAddErr error allocateLaneRecorder func(address.Address) createPaymentVoucherRecorder func(voucher *paych.SignedVoucher) @@ -28,24 +32,30 @@ type TestRetrievalClientNode struct { // TestRetrievalClientNodeParams are parameters for initializing a TestRetrievalClientNode type TestRetrievalClientNodeParams struct { - PayCh address.Address - PayChErr error - Lane uint64 - LaneError error - Voucher *paych.SignedVoucher - VoucherError error - AllocateLaneRecorder func(address.Address) - PaymentVoucherRecorder func(voucher *paych.SignedVoucher) - PaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount) + PayCh address.Address + PayChErr error + CreatePaychCID, AddFundsCID cid.Cid + Lane uint64 + LaneError error + Voucher *paych.SignedVoucher + VoucherError error + AllocateLaneRecorder func(address.Address) + PaymentVoucherRecorder func(voucher *paych.SignedVoucher) + PaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount) + AddFundsOnly bool + WaitForAddFundsErr, WaitForChCreateErr error } var _ retrievalmarket.RetrievalClientNode = &TestRetrievalClientNode{} -// NewTestRetrievalClientNode instantiates a new TestRetrievalClientNode based ont he given params +// NewTestRetrievalClientNode initializes a new TestRetrievalClientNode based on the given params func NewTestRetrievalClientNode(params TestRetrievalClientNodeParams) *TestRetrievalClientNode { return &TestRetrievalClientNode{ + addFundsOnly: params.AddFundsOnly, payCh: params.PayCh, payChErr: params.PayChErr, + waitCreateErr: params.WaitForChCreateErr, + waitAddErr: params.WaitForAddFundsErr, lane: params.Lane, laneError: params.LaneError, voucher: params.Voucher, @@ -53,15 +63,23 @@ func NewTestRetrievalClientNode(params TestRetrievalClientNodeParams) *TestRetri allocateLaneRecorder: params.AllocateLaneRecorder, createPaymentVoucherRecorder: params.PaymentVoucherRecorder, getCreatePaymentChannelRecorder: params.PaymentChannelRecorder, + createPaychMsgCID: params.CreatePaychCID, + addFundsMsgCID: params.AddFundsCID, } } // GetOrCreatePaymentChannel returns a mocked payment channel -func (trcn *TestRetrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, error) { +func (trcn *TestRetrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) { if trcn.getCreatePaymentChannelRecorder != nil { trcn.getCreatePaymentChannelRecorder(clientAddress, minerAddress, clientFundsAvailable) } - return trcn.payCh, trcn.payChErr + var payCh address.Address + msgCID := trcn.createPaychMsgCID + if trcn.addFundsOnly { + payCh = trcn.payCh + msgCID = trcn.addFundsMsgCID + } + return payCh, msgCID, trcn.payChErr } // AllocateLane creates a mock lane on a payment channel @@ -83,3 +101,16 @@ func (trcn *TestRetrievalClientNode) CreatePaymentVoucher(ctx context.Context, p func (trcn *TestRetrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) { return shared.TipSetToken{}, 0, nil } +func (trcn *TestRetrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error { + if messageCID != trcn.addFundsMsgCID { + return fmt.Errorf("expected messageCID: %s does not match actual: %s", trcn.addFundsMsgCID, messageCID) + } + return trcn.waitAddErr +} + +func (trcn *TestRetrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) { + if messageCID != trcn.createPaychMsgCID { + return address.Undef, fmt.Errorf("expected messageCID: %s does not match actual: %s", trcn.createPaychMsgCID, messageCID) + } + return trcn.payCh, trcn.waitCreateErr +} diff --git a/retrievalmarket/types.go b/retrievalmarket/types.go index 349f7719..34530a9c 100644 --- a/retrievalmarket/types.go +++ b/retrievalmarket/types.go @@ -55,6 +55,7 @@ type ClientDealState struct { CurrentInterval uint64 PaymentRequested abi.TokenAmount FundsSpent abi.TokenAmount + WaitMsgCID *cid.Cid // the CID of any message the client deal is waiting for } // ClientEvent is an event that occurs in a deal lifecycle on the client @@ -70,8 +71,21 @@ const ( // ClientEventAllocateLaneErrored means there was a failure creating a lane in a payment channel ClientEventAllocateLaneErrored - // ClientEventPaymentChannelCreated means a payment channel has successfully been created - ClientEventPaymentChannelCreated + // ClientEventPaymentChannelCreateInitiated means we are waiting for a message to + // create a payment channel to appear on chain + ClientEventPaymentChannelCreateInitiated + + // ClientEventPaymentChannelReady means the newly created payment channel is ready for the + // deal to resume + ClientEventPaymentChannelReady + + // ClientEventPaymentChannelAddingFunds mean we are waiting for funds to be + // added to a payment channel + ClientEventPaymentChannelAddingFunds + + // ClientEventPaymentChannelAddingFunds means that adding funds to the payment channel + // failed + ClientEventPaymentChannelAddFundsErrored // ClientEventWriteDealProposalErrored means a network error writing a deal proposal ClientEventWriteDealProposalErrored @@ -183,7 +197,7 @@ type RetrievalClientNode interface { // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist // between a client and a miner and insures the client has the given amount of funds available in the channel - GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, error) + GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) // Allocate late creates a lane within a payment channel so that calls to // CreatePaymentVoucher will automatically make vouchers only for the difference @@ -194,6 +208,14 @@ type RetrievalClientNode interface { // given payment channel so that all the payment vouchers in the lane add up // to the given amount (so the payment voucher will be for the difference) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount abi.TokenAmount, lane uint64, tok shared.TipSetToken) (*paych.SignedVoucher, error) + + // WaitForPaymentChannelAddFunds waits for a message on chain that funds have + // been sent to a payment channel + WaitForPaymentChannelAddFunds(messageCID cid.Cid) error + + // WaitForPaymentChannelCreation waits for a message on chain that a + // payment channel has been created + WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) } // ProviderDealState is the current state of a deal from the point of view @@ -437,9 +459,20 @@ const ( // DealStatusNew is a deal that nothing has happened with yet DealStatusNew DealStatus = iota - // DealStatusPaymentChannelCreated is a deal status that has a payment channel + // DealStatusPaymentChannelCreating is the status set while waiting for the + // payment channel creation to complete + DealStatusPaymentChannelCreating + + // DealStatusPaymentChannelAddingFunds is the status when we are waiting for funds + // to finish being sent to the payment channel + DealStatusPaymentChannelAddingFunds + + // DealStatusPaymentChannelAllocatingLane is the status during lane allocation + DealStatusPaymentChannelAllocatingLane + + // DealStatusPaymentChannelReady is a deal status that has a payment channel // & lane setup - DealStatusPaymentChannelCreated + DealStatusPaymentChannelReady // DealStatusAccepted means a deal has been accepted by a provider // and its is ready to proceed with retrieval @@ -486,20 +519,22 @@ const ( // DealStatuses maps deal status to a human readable representation var DealStatuses = map[DealStatus]string{ - DealStatusNew: "DealStatusNew", - DealStatusPaymentChannelCreated: "DealStatusPaymentChannelCreated", - DealStatusAccepted: "DealStatusAccepted", - DealStatusFailed: "DealStatusFailed", - DealStatusRejected: "DealStatusRejected", - DealStatusFundsNeeded: "DealStatusFundsNeeded", - DealStatusOngoing: "DealStatusOngoing", - DealStatusFundsNeededLastPayment: "DealStatusFundsNeededLastPayment", - DealStatusCompleted: "DealStatusCompleted", - DealStatusDealNotFound: "DealStatusDealNotFound", - DealStatusVerified: "DealStatusVerified", - DealStatusErrored: "DealStatusErrored", - DealStatusBlocksComplete: "DealStatusBlocksComplete", - DealStatusFinalizing: "DealStatusFinalizing", + DealStatusNew: "DealStatusNew", + DealStatusPaymentChannelCreating: "DealStatusPaymentChannelCreating", + DealStatusPaymentChannelAddingFunds: "DealStatusPaymentChannelAddingFunds", + DealStatusPaymentChannelReady: "DealStatusPaymentChannelReady", + DealStatusAccepted: "DealStatusAccepted", + DealStatusFailed: "DealStatusFailed", + DealStatusRejected: "DealStatusRejected", + DealStatusFundsNeeded: "DealStatusFundsNeeded", + DealStatusOngoing: "DealStatusOngoing", + DealStatusFundsNeededLastPayment: "DealStatusFundsNeededLastPayment", + DealStatusCompleted: "DealStatusCompleted", + DealStatusDealNotFound: "DealStatusDealNotFound", + DealStatusVerified: "DealStatusVerified", + DealStatusErrored: "DealStatusErrored", + DealStatusBlocksComplete: "DealStatusBlocksComplete", + DealStatusFinalizing: "DealStatusFinalizing", } // IsTerminalError returns true if this status indicates processing of this deal diff --git a/retrievalmarket/types_cbor_gen.go b/retrievalmarket/types_cbor_gen.go index e867c96c..b6b6f7e2 100644 --- a/retrievalmarket/types_cbor_gen.go +++ b/retrievalmarket/types_cbor_gen.go @@ -874,7 +874,7 @@ func (t *ClientDealState) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{141}); err != nil { + if _, err := w.Write([]byte{142}); err != nil { return err } @@ -960,6 +960,19 @@ func (t *ClientDealState) MarshalCBOR(w io.Writer) error { if err := t.FundsSpent.MarshalCBOR(w); err != nil { return err } + + // t.WaitMsgCID (cid.Cid) (struct) + + if t.WaitMsgCID == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.WaitMsgCID); err != nil { + return xerrors.Errorf("failed to write cid field t.WaitMsgCID: %w", err) + } + } + return nil } @@ -974,7 +987,7 @@ func (t *ClientDealState) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 13 { + if extra != 14 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -1128,6 +1141,30 @@ func (t *ClientDealState) UnmarshalCBOR(r io.Reader) error { return xerrors.Errorf("unmarshaling t.FundsSpent: %w", err) } + } + // t.WaitMsgCID (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.WaitMsgCID: %w", err) + } + + t.WaitMsgCID = &c + } + } return nil }