diff --git a/go.mod b/go.mod index 08bcf52e55d..6e1d3a3a42d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6 github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 - github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984 + github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 github.com/filecoin-project/go-statestore v0.1.0 diff --git a/go.sum b/go.sum index 2cb366089fe..d1d32ff9dee 100644 --- a/go.sum +++ b/go.sum @@ -147,8 +147,8 @@ github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6/ github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo= github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA= github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc= -github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984 h1:QY5jgd5T4txUEC2k9BPqWRlhDUTdFx5f1z/StOlh92g= -github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984/go.mod h1:vcX3y5FVyuclIZgogPG1uIvJxHLSBU54B1ANJ88uMNk= +github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f h1:mPmWWrEwc/5zZW2E14m8a7HMrrOWREaflGZL1Iun/Aw= +github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f/go.mod h1:vcX3y5FVyuclIZgogPG1uIvJxHLSBU54B1ANJ88uMNk= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= diff --git a/markets/retrievaladapter/client.go b/markets/retrievaladapter/client.go index 39fae18b6f3..dcd42d4144d 100644 --- a/markets/retrievaladapter/client.go +++ b/markets/retrievaladapter/client.go @@ -1,13 +1,18 @@ package retrievaladapter import ( + "bytes" "context" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/specs-actors/actors/abi" + initactor "github.com/filecoin-project/specs-actors/actors/builtin/init" "github.com/filecoin-project/specs-actors/actors/builtin/paych" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" @@ -29,11 +34,10 @@ func NewRetrievalClientNode(pmgr *paychmgr.Manager, payapi payapi.PaychAPI, chai // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist // between a client and a miner and ensures the client has the given amount of // funds available in the channel. -func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, error) { +func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) { // TODO: respect the provided TipSetToken (a serialized TipSetKey) when // querying the chain - paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) - return paych, err + return rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) } // Allocate late creates a lane within a payment channel so that calls to @@ -64,3 +68,33 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet return head.Key().Bytes(), head.Height(), nil } + +// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within +// defaultMsgWaitTimeout it returns error +func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error { + _, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID) + + if err != nil { + return err + } + if mr.ExitCode != exitcode.Ok { + return xerrors.Errorf("wait for payment channel to add funds failed. exit code: %d", mr.ExitCode) + } + return nil +} + +func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) { + _, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID) + + if err != nil { + return address.Undef, err + } + if mr.ExitCode != exitcode.Ok { + return address.Undef, xerrors.Errorf("payment channel creation failed. exit code: %d", mr.ExitCode) + } + var retval initactor.ExecReturn + if err := retval.UnmarshalCBOR(bytes.NewReader(mr.Return)); err != nil { + return address.Undef, err + } + return retval.RobustAddress, nil +} diff --git a/paychmgr/simple.go b/paychmgr/simple.go index 653f5849122..ff537d425b3 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -3,7 +3,6 @@ package paychmgr import ( "bytes" "context" - "fmt" "github.com/filecoin-project/specs-actors/actors/builtin" init_ "github.com/filecoin-project/specs-actors/actors/builtin/init" @@ -17,10 +16,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { +func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (cid.Cid, error) { params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to}) if aerr != nil { - return address.Undef, cid.Undef, aerr + return cid.Undef, aerr } enc, aerr := actors.SerializeParams(&init_.ExecParams{ @@ -28,7 +27,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am ConstructorParams: params, }) if aerr != nil { - return address.Undef, cid.Undef, aerr + return cid.Undef, aerr } msg := &types.Message{ @@ -43,42 +42,46 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am smsg, err := pm.mpool.MpoolPushMessage(ctx, msg) if err != nil { - return address.Undef, cid.Undef, xerrors.Errorf("initializing paych actor: %w", err) + return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err) } - mcid := smsg.Cid() + go pm.waitForPaychCreateMsg(ctx, mcid) + return mcid, nil +} - // TODO: wait outside the store lock! - // (tricky because we need to setup channel tracking before we know it's address) +// WaitForPaychCreateMsg waits for mcid to appear on chain and returns the robust address of the +// created payment channel +// TODO: wait outside the store lock! +// (tricky because we need to setup channel tracking before we know its address) +func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) { + defer pm.store.lk.Unlock() mwait, err := pm.state.StateWaitMsg(ctx, mcid) if err != nil { - return address.Undef, cid.Undef, xerrors.Errorf("wait msg: %w", err) + log.Errorf("wait msg: %w", err) } if mwait.Receipt.ExitCode != 0 { - return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) + log.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) } var decodedReturn init_.ExecReturn err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return)) if err != nil { - return address.Undef, cid.Undef, err + log.Error(err) } paychaddr := decodedReturn.RobustAddress ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr) if err != nil { - return address.Undef, cid.Undef, xerrors.Errorf("loading channel info: %w", err) + log.Errorf("loading channel info: %w", err) } if err := pm.store.trackChannel(ci); err != nil { - return address.Undef, cid.Undef, xerrors.Errorf("tracking channel: %w", err) + log.Errorf("tracking channel: %w", err) } - - return paychaddr, mcid, nil } -func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error { +func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) (cid.Cid, error) { msg := &types.Message{ To: ch, From: from, @@ -90,25 +93,31 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres smsg, err := pm.mpool.MpoolPushMessage(ctx, msg) if err != nil { - return err + return cid.Undef, err } + mcid := smsg.Cid() + go pm.waitForAddFundsMsg(ctx, mcid) + return mcid, nil +} - mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock! +// WaitForAddFundsMsg waits for mcid to appear on chain and returns error, if any +// TODO: wait outside the store lock! +// (tricky because we need to setup channel tracking before we know it's address) +func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) { + defer pm.store.lk.Unlock() + mwait, err := pm.state.StateWaitMsg(ctx, mcid) if err != nil { - return err + log.Error(err) } if mwait.Receipt.ExitCode != 0 { - return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode) + log.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode) } - - return nil } func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) { - pm.store.lk.Lock() - defer pm.store.lk.Unlock() - + pm.store.lk.Lock() // unlock only on err; wait funcs will defer unlock + var mcid cid.Cid ch, err := pm.store.findChan(func(ci *ChannelInfo) bool { if ci.Direction != DirOutbound { return false @@ -116,12 +125,17 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensur return ci.Control == from && ci.Target == to }) if err != nil { + pm.store.lk.Unlock() return address.Undef, cid.Undef, xerrors.Errorf("findChan: %w", err) } if ch != address.Undef { // TODO: Track available funds - return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree) + mcid, err = pm.addFunds(ctx, ch, from, ensureFree) + } else { + mcid, err = pm.createPaych(ctx, from, to, ensureFree) } - - return pm.createPaych(ctx, from, to, ensureFree) + if err != nil { + pm.store.lk.Unlock() + } + return ch, mcid, err }