Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/no block chain ops #190

Merged
merged 9 commits into from
Apr 15, 2020
64 changes: 41 additions & 23 deletions retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand All @@ -28,26 +29,41 @@ var ClientEvents = fsm.Events{
fsm.Event(rm.ClientEventOpen).
From(rm.DealStatusNew).ToNoChange(),
fsm.Event(rm.ClientEventPaymentChannelErrored).
From(rm.DealStatusAccepted).To(rm.DealStatusFailed).
FromMany(rm.DealStatusAccepted, rm.DealStatusPaymentChannelCreating).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("getting payment channel: %w", err).Error()
deal.Message = xerrors.Errorf("get or create payment channel: %w", err).Error()
return nil
}),
fsm.Event(rm.ClientEventAllocateLaneErrored).
From(rm.DealStatusAccepted).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("allocating payment lane: %w", err).Error()
fsm.Event(rm.ClientEventPaymentChannelCreateInitiated).
From(rm.DealStatusAccepted).To(rm.DealStatusPaymentChannelCreating).
Action(func(deal *rm.ClientDealState, msgCID cid.Cid) error {
deal.WaitMsgCID = &msgCID
return nil
}),
fsm.Event(rm.ClientEventPaymentChannelCreated).
From(rm.DealStatusAccepted).To(rm.DealStatusPaymentChannelCreated).
fsm.Event(rm.ClientEventPaymentChannelAddingFunds).
FromMany(rm.DealStatusOngoing, rm.DealStatusPaymentChannelReady).To(rm.DealStatusPaymentChannelAddingFunds),
fsm.Event(rm.ClientEventPaymentChannelReady).
FromMany(rm.DealStatusPaymentChannelCreating, rm.DealStatusPaymentChannelAddingFunds).
To(rm.DealStatusPaymentChannelReady).
Action(func(deal *rm.ClientDealState, payCh address.Address, lane uint64) error {
deal.PaymentInfo = &rm.PaymentInfo{
PayCh: payCh,
Lane: lane,
}
return nil
}),
fsm.Event(rm.ClientEventAllocateLaneErrored).
From(rm.DealStatusPaymentChannelCreating).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("allocating payment lane: %w", err).Error()
return nil
}),
fsm.Event(rm.ClientEventPaymentChannelAddFundsErrored).
From(rm.DealStatusPaymentChannelAddingFunds).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("wait for add funds: %w", err).Error()
return nil
}),
fsm.Event(rm.ClientEventWriteDealProposalErrored).
FromAny().To(rm.DealStatusErrored).
Action(func(deal *rm.ClientDealState, err error) error {
Expand Down Expand Up @@ -123,50 +139,52 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(rm.ClientEventConsumeBlockFailed).
FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFailed).
FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState, err error) error {
deal.Message = xerrors.Errorf("consuming block: %w", err).Error()
return nil
}),
fsm.Event(rm.ClientEventLastPaymentRequested).
FromMany(rm.DealStatusPaymentChannelCreated,
FromMany(rm.DealStatusPaymentChannelReady,
rm.DealStatusOngoing,
rm.DealStatusBlocksComplete).To(rm.DealStatusFundsNeededLastPayment).
Action(recordPaymentOwed),
fsm.Event(rm.ClientEventAllBlocksReceived).
FromMany(rm.DealStatusPaymentChannelCreated,
FromMany(rm.DealStatusPaymentChannelReady,
rm.DealStatusOngoing,
rm.DealStatusBlocksComplete).To(rm.DealStatusBlocksComplete).
Action(recordProcessed),
fsm.Event(rm.ClientEventComplete).
FromMany(rm.DealStatusPaymentChannelCreated,
FromMany(rm.DealStatusPaymentChannelReady,
rm.DealStatusOngoing,
rm.DealStatusBlocksComplete,
rm.DealStatusFinalizing).To(rm.DealStatusCompleted).
Action(recordProcessed),
fsm.Event(rm.ClientEventEarlyTermination).
FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFailed).
FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFailed).
Action(func(deal *rm.ClientDealState) error {
deal.Message = "received complete status before all blocks received"
return nil
}),
fsm.Event(rm.ClientEventPaymentRequested).
FromMany(rm.DealStatusPaymentChannelCreated, rm.DealStatusOngoing).To(rm.DealStatusFundsNeeded).
FromMany(rm.DealStatusPaymentChannelReady, rm.DealStatusOngoing).To(rm.DealStatusFundsNeeded).
Action(recordPaymentOwed),
fsm.Event(rm.ClientEventBlocksReceived).
From(rm.DealStatusPaymentChannelCreated).To(rm.DealStatusOngoing).
From(rm.DealStatusPaymentChannelReady).To(rm.DealStatusOngoing).
From(rm.DealStatusOngoing).ToNoChange().
Action(recordProcessed),
}

// ClientStateEntryFuncs are the handlers for different states in a retrieval client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
rm.DealStatusNew: ProposeDeal,
rm.DealStatusAccepted: SetupPaymentChannel,
rm.DealStatusPaymentChannelCreated: ProcessNextResponse,
rm.DealStatusOngoing: ProcessNextResponse,
rm.DealStatusBlocksComplete: ProcessNextResponse,
rm.DealStatusFundsNeeded: ProcessPaymentRequested,
rm.DealStatusFundsNeededLastPayment: ProcessPaymentRequested,
rm.DealStatusFinalizing: Finalize,
rm.DealStatusNew: ProposeDeal,
rm.DealStatusAccepted: SetupPaymentChannelStart,
rm.DealStatusPaymentChannelCreating: WaitForPaymentChannelCreate,
rm.DealStatusPaymentChannelAddingFunds: WaitForPaymentChannelAddFunds,
rm.DealStatusPaymentChannelReady: ProcessNextResponse,
rm.DealStatusOngoing: ProcessNextResponse,
rm.DealStatusBlocksComplete: ProcessNextResponse,
rm.DealStatusFundsNeeded: ProcessPaymentRequested,
rm.DealStatusFundsNeededLastPayment: ProcessPaymentRequested,
rm.DealStatusFinalizing: Finalize,
}
37 changes: 32 additions & 5 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clientstates
import (
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
Expand All @@ -18,22 +19,48 @@ type ClientDealEnvironment interface {
ConsumeBlock(context.Context, rm.DealID, rm.Block) (uint64, bool, error)
}

// SetupPaymentChannel sets up a payment channel for a deal
func SetupPaymentChannel(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
// SetupPaymentChannelStart initiates setting up a payment channel for a deal
func SetupPaymentChannelStart(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
tok, _, err := environment.Node().GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err)
}

paych, err := environment.Node().GetOrCreatePaymentChannel(ctx.Context(), deal.ClientWallet, deal.MinerWallet, deal.TotalFunds, tok)
paych, msgCID, err := environment.Node().GetOrCreatePaymentChannel(ctx.Context(), deal.ClientWallet, deal.MinerWallet, deal.TotalFunds, tok)
if err != nil {
return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err)
}

if paych == address.Undef {
return ctx.Trigger(rm.ClientEventPaymentChannelCreateInitiated, msgCID)
}

return ctx.Trigger(rm.ClientEventPaymentChannelAddingFunds)
}

// WaitForPaymentChannelCreate waits for payment channel creation to be posted on chain,
// allocates a lane for vouchers, then signals that the payment channel is ready
func WaitForPaymentChannelCreate(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
paych, err := environment.Node().WaitForPaymentChannelCreation(*deal.WaitMsgCID)
if err != nil {
return ctx.Trigger(rm.ClientEventPaymentChannelErrored, err)
}

lane, err := environment.Node().AllocateLane(paych)
if err != nil {
return ctx.Trigger(rm.ClientEventAllocateLaneErrored, err)
}
return ctx.Trigger(rm.ClientEventPaymentChannelCreated, paych, lane)
return ctx.Trigger(rm.ClientEventPaymentChannelReady, paych, lane)
}

// WaitForPaymentChannelAddFunds waits for funds to be added to an existing payment channel, then
// signals that payment channel is ready again
func WaitForPaymentChannelAddFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error {
err := environment.Node().WaitForPaymentChannelAddFunds(*deal.WaitMsgCID)
if err != nil {
return ctx.Trigger(rm.ClientEventPaymentChannelAddFundsErrored, err)
}
return ctx.Trigger(rm.ClientEventPaymentChannelReady, deal.PaymentInfo.PayCh, deal.PaymentInfo.Lane)
}

// ProposeDeal sends the proposal to the other party
Expand Down Expand Up @@ -147,7 +174,7 @@ func ProcessNextResponse(ctx fsm.Context, environment ClientDealEnvironment, dea
return ctx.Trigger(rm.ClientEventEarlyTermination)
case rm.DealStatusFundsNeeded:
return ctx.Trigger(rm.ClientEventPaymentRequested, totalProcessed, response.PaymentOwed)
case rm.DealStatusOngoing:
case rm.DealStatusOngoing, rm.DealStatusPaymentChannelReady:
return ctx.Trigger(rm.ClientEventBlocksReceived, totalProcessed)
default:
return ctx.Trigger(rm.ClientEventUnknownResponseReceived)
Expand Down
138 changes: 121 additions & 17 deletions retrievalmarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
clientstates "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
testnet "github.com/filecoin-project/go-fil-markets/shared_testutil"
Expand Down Expand Up @@ -66,22 +67,32 @@ func TestSetupPaymentChannel(t *testing.T) {
node := testnodes.NewTestRetrievalClientNode(params)
environment := &fakeEnvironment{node, ds, 0, nil}
fsmCtx := fsmtest.NewTestContext(ctx, eventMachine)
err := clientstates.SetupPaymentChannel(fsmCtx, environment, *dealState)
err := clientstates.SetupPaymentChannelStart(fsmCtx, environment, *dealState)
require.NoError(t, err)
fsmCtx.ReplayEvents(t, dealState)
}

t.Run("it works", func(t *testing.T) {
t.Run("payment channel create initiated", func(t *testing.T) {
envParams := testnodes.TestRetrievalClientNodeParams{
PayCh: address.Undef,
CreatePaychCID: testnet.GenerateCids(1)[0],
}
dealState := makeDealState(retrievalmarket.DealStatusAccepted)
runSetupPaymentChannel(t, envParams, dealState)
require.Empty(t, dealState.Message)
require.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelCreating)
})

t.Run("payment channel needs funds added", func(t *testing.T) {
envParams := testnodes.TestRetrievalClientNodeParams{
PayCh: expectedPayCh,
Lane: expectedLane,
AddFundsOnly: true,
PayCh: expectedPayCh,
CreatePaychCID: testnet.GenerateCids(1)[0],
}
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelReady)
runSetupPaymentChannel(t, envParams, dealState)
require.Empty(t, dealState.Message)
require.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelCreated)
require.Equal(t, dealState.PaymentInfo.PayCh, expectedPayCh)
require.Equal(t, dealState.PaymentInfo.Lane, expectedLane)
require.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelAddingFunds)
})

t.Run("when create payment channel fails", func(t *testing.T) {
Expand All @@ -96,16 +107,109 @@ func TestSetupPaymentChannel(t *testing.T) {
require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed)
})

t.Run("when allocate lane fails", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusAccepted)
envParams := testnodes.TestRetrievalClientNodeParams{
PayCh: expectedPayCh,
Lane: expectedLane,
LaneError: errors.New("Something went wrong"),
}

func TestWaitForPaymentChannelCreate(t *testing.T) {
ctx := context.Background()
ds := testnet.NewTestRetrievalDealStream(testnet.TestDealStreamParams{})
expectedPayCh := address.TestAddress2
expectedLane := uint64(10)
eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents)
require.NoError(t, err)
runWaitForPaychCreate := func(t *testing.T,
params testnodes.TestRetrievalClientNodeParams,
dealState *retrievalmarket.ClientDealState) {
node := testnodes.NewTestRetrievalClientNode(params)
environment := &fakeEnvironment{node, ds, 0, nil}
fsmCtx := fsmtest.NewTestContext(ctx, eventMachine)
err := clientstates.WaitForPaymentChannelCreate(fsmCtx, environment, *dealState)
require.NoError(t, err)
fsmCtx.ReplayEvents(t, dealState)
}
msgCID := testnet.GenerateCids(1)[0]

t.Run("it works", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating)
dealState.WaitMsgCID = &msgCID
params := testnodes.TestRetrievalClientNodeParams{
PayCh: expectedPayCh,
CreatePaychCID: msgCID,
Lane: expectedLane,
}
runSetupPaymentChannel(t, envParams, dealState)
require.NotEmpty(t, dealState.Message)
require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed)
runWaitForPaychCreate(t, params, dealState)
require.Empty(t, dealState.Message)
assert.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelReady)
assert.Equal(t, expectedLane, dealState.PaymentInfo.Lane)
})
t.Run("if Wait fails", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating)
dealState.WaitMsgCID = &msgCID
params := testnodes.TestRetrievalClientNodeParams{
PayCh: expectedPayCh,
CreatePaychCID: msgCID,
WaitForChCreateErr: errors.New("boom"),
}
runWaitForPaychCreate(t, params, dealState)
assert.Contains(t, dealState.Message, "boom")
assert.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed)
})

t.Run("if AllocateLane fails", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelCreating)
dealState.WaitMsgCID = &msgCID
params := testnodes.TestRetrievalClientNodeParams{
PayCh: expectedPayCh,
CreatePaychCID: msgCID,
LaneError: errors.New("boom"),
}
runWaitForPaychCreate(t, params, dealState)
assert.Contains(t, dealState.Message, "boom")
assert.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed)
})
}

func TestWaitForPaymentChannelAddFunds(t *testing.T) {
ctx := context.Background()
ds := testnet.NewTestRetrievalDealStream(testnet.TestDealStreamParams{})
expectedPayCh := address.TestAddress2
eventMachine, err := fsm.NewEventProcessor(retrievalmarket.ClientDealState{}, "Status", clientstates.ClientEvents)
require.NoError(t, err)
runWaitForPaychAddFunds := func(t *testing.T,
params testnodes.TestRetrievalClientNodeParams,
dealState *retrievalmarket.ClientDealState) {
node := testnodes.NewTestRetrievalClientNode(params)
environment := &fakeEnvironment{node, ds, 0, nil}
fsmCtx := fsmtest.NewTestContext(ctx, eventMachine)
err := clientstates.WaitForPaymentChannelAddFunds(fsmCtx, environment, *dealState)
require.NoError(t, err)
fsmCtx.ReplayEvents(t, dealState)
}
msgCID := testnet.GenerateCids(1)[0]

t.Run("it works", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelAddingFunds)
dealState.WaitMsgCID = &msgCID
params := testnodes.TestRetrievalClientNodeParams{
AddFundsOnly: true,
PayCh: expectedPayCh,
AddFundsCID: msgCID,
}
runWaitForPaychAddFunds(t, params, dealState)
require.Empty(t, dealState.Message)
assert.Equal(t, dealState.Status, retrievalmarket.DealStatusPaymentChannelReady)
})
t.Run("if Wait fails", func(t *testing.T) {
dealState := makeDealState(retrievalmarket.DealStatusPaymentChannelAddingFunds)
dealState.WaitMsgCID = &msgCID
params := testnodes.TestRetrievalClientNodeParams{
AddFundsOnly: true,
PayCh: expectedPayCh,
AddFundsCID: msgCID,
WaitForAddFundsErr: errors.New("boom"),
}
runWaitForPaychAddFunds(t, params, dealState)
assert.Contains(t, dealState.Message, "boom")
assert.Equal(t, dealState.Status, retrievalmarket.DealStatusFailed)
})
}

Expand Down
10 changes: 9 additions & 1 deletion retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
retrievalmarket.RetrievalProvider) {
testData := tut.NewLibp2pTestData(bgCtx, t)
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)
rcNode1 := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{PayCh: payChAddr})
cids := tut.GenerateCids(2)
rcNode1 := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{
PayCh: payChAddr,
CreatePaychCID: cids[0],
AddFundsCID: cids[1],
})
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, rcNode1, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1)
require.NoError(t, err)
nw2 := rmnet.NewFromLibp2pHost(testData.Host2)
Expand Down Expand Up @@ -389,13 +394,16 @@ func setupClient(
paymentVoucherRecorder := func(v *paych.SignedVoucher) {
createdVoucher = *v
}
cids := tut.GenerateCids(2)
clientNode := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{
PayCh: clientPaymentChannel,
Lane: expectedVoucher.Lane,
Voucher: expectedVoucher,
PaymentChannelRecorder: paymentChannelRecorder,
AllocateLaneRecorder: laneRecorder,
PaymentVoucherRecorder: paymentVoucherRecorder,
CreatePaychCID: cids[0],
AddFundsCID: cids[1],
})
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, clientNode, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1)
return &createdChan, &newLaneAddr, &createdVoucher, client, err
Expand Down
Loading