Skip to content

Commit

Permalink
Merge pull request #1565 from filecoin-project/feat/retrieval-market-…
Browse files Browse the repository at this point in the history
…#1552

Feat/retrieval market #1552
  • Loading branch information
magik6k authored Apr 22, 2020
2 parents 6b2b2b6 + f7228f9 commit 93b30ab
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984
github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-statestore v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6/
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984 h1:QY5jgd5T4txUEC2k9BPqWRlhDUTdFx5f1z/StOlh92g=
github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984/go.mod h1:vcX3y5FVyuclIZgogPG1uIvJxHLSBU54B1ANJ88uMNk=
github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f h1:mPmWWrEwc/5zZW2E14m8a7HMrrOWREaflGZL1Iun/Aw=
github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f/go.mod h1:vcX3y5FVyuclIZgogPG1uIvJxHLSBU54B1ANJ88uMNk=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
Expand Down
40 changes: 37 additions & 3 deletions markets/retrievaladapter/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package retrievaladapter

import (
"bytes"
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/specs-actors/actors/abi"
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
Expand All @@ -29,11 +34,10 @@ func NewRetrievalClientNode(pmgr *paychmgr.Manager, payapi payapi.PaychAPI, chai
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
// between a client and a miner and ensures the client has the given amount of
// funds available in the channel.
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, error) {
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
return paych, err
return rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
}

// Allocate late creates a lane within a payment channel so that calls to
Expand Down Expand Up @@ -64,3 +68,33 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet

return head.Key().Bytes(), head.Height(), nil
}

// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within
// defaultMsgWaitTimeout it returns error
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)

if err != nil {
return err
}
if mr.ExitCode != exitcode.Ok {
return xerrors.Errorf("wait for payment channel to add funds failed. exit code: %d", mr.ExitCode)
}
return nil
}

func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)

if err != nil {
return address.Undef, err
}
if mr.ExitCode != exitcode.Ok {
return address.Undef, xerrors.Errorf("payment channel creation failed. exit code: %d", mr.ExitCode)
}
var retval initactor.ExecReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(mr.Return)); err != nil {
return address.Undef, err
}
return retval.RobustAddress, nil
}
70 changes: 42 additions & 28 deletions paychmgr/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package paychmgr
import (
"bytes"
"context"
"fmt"

"github.com/filecoin-project/specs-actors/actors/builtin"
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
Expand All @@ -17,18 +16,18 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (cid.Cid, error) {
params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to})
if aerr != nil {
return address.Undef, cid.Undef, aerr
return cid.Undef, aerr
}

enc, aerr := actors.SerializeParams(&init_.ExecParams{
CodeCID: builtin.PaymentChannelActorCodeID,
ConstructorParams: params,
})
if aerr != nil {
return address.Undef, cid.Undef, aerr
return cid.Undef, aerr
}

msg := &types.Message{
Expand All @@ -43,42 +42,46 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am

smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
}

mcid := smsg.Cid()
go pm.waitForPaychCreateMsg(ctx, mcid)
return mcid, nil
}

// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
// WaitForPaychCreateMsg waits for mcid to appear on chain and returns the robust address of the
// created payment channel
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know its address)
func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) {
defer pm.store.lk.Unlock()
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("wait msg: %w", err)
log.Errorf("wait msg: %w", err)
}

if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
log.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}

var decodedReturn init_.ExecReturn
err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return))
if err != nil {
return address.Undef, cid.Undef, err
log.Error(err)
}
paychaddr := decodedReturn.RobustAddress

ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr)
if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("loading channel info: %w", err)
log.Errorf("loading channel info: %w", err)
}

if err := pm.store.trackChannel(ci); err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("tracking channel: %w", err)
log.Errorf("tracking channel: %w", err)
}

return paychaddr, mcid, nil
}

func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) (cid.Cid, error) {
msg := &types.Message{
To: ch,
From: from,
Expand All @@ -90,38 +93,49 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres

smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil {
return err
return cid.Undef, err
}
mcid := smsg.Cid()
go pm.waitForAddFundsMsg(ctx, mcid)
return mcid, nil
}

mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
// WaitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) {
defer pm.store.lk.Unlock()
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
if err != nil {
return err
log.Error(err)
}

if mwait.Receipt.ExitCode != 0 {
return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
log.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}

return nil
}

func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) {
pm.store.lk.Lock()
defer pm.store.lk.Unlock()

pm.store.lk.Lock() // unlock only on err; wait funcs will defer unlock
var mcid cid.Cid
ch, err := pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
}
return ci.Control == from && ci.Target == to
})
if err != nil {
pm.store.lk.Unlock()
return address.Undef, cid.Undef, xerrors.Errorf("findChan: %w", err)
}
if ch != address.Undef {
// TODO: Track available funds
return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree)
mcid, err = pm.addFunds(ctx, ch, from, ensureFree)
} else {
mcid, err = pm.createPaych(ctx, from, to, ensureFree)
}

return pm.createPaych(ctx, from, to, ensureFree)
if err != nil {
pm.store.lk.Unlock()
}
return ch, mcid, err
}

0 comments on commit 93b30ab

Please sign in to comment.