Skip to content

Commit

Permalink
WIP: fix payment channel locking
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 4, 2020
1 parent c3ff29c commit 1e86e83
Show file tree
Hide file tree
Showing 21 changed files with 2,741 additions and 414 deletions.
3 changes: 2 additions & 1 deletion api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ type FullNode interface {
// MethodGroup: Paych
// The Paych methods are for interacting with and managing payment channels

PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error)
PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychSettle(context.Context, address.Address) (cid.Cid, error)
Expand Down
11 changes: 8 additions & 3 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ type FullNodeStruct struct {

MarketEnsureAvailable func(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`

PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"` // TODO: is perm:"sign" correct?
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*api.PaychStatus, error) `perm:"read"`
PaychSettle func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
Expand Down Expand Up @@ -777,8 +778,12 @@ func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr, wallet
return c.Internal.MarketEnsureAvailable(ctx, addr, wallet, amt)
}

func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, amt)
}

func (c *FullNodeStruct) PaychGetWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
return c.Internal.PaychGetWaitReady(ctx, mcid)
}

func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) {
Expand Down
9 changes: 2 additions & 7 deletions api/test/paych.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package test

import (
"bytes"
"context"
"fmt"
"os"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
)

Expand Down Expand Up @@ -72,13 +70,10 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}

res := waitForMessage(ctx, t, paymentCreator, channelInfo.ChannelMessage, time.Second, "channel create")
var params initactor.ExecReturn
err = params.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.ChannelMessage)
if err != nil {
t.Fatal(err)
}
channel := params.RobustAddress

// allocate three lanes
var lanes []uint64
Expand Down Expand Up @@ -124,7 +119,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}

res = waitForMessage(ctx, t, paymentCreator, settleMsgCid, time.Second*10, "settle")
res := waitForMessage(ctx, t, paymentCreator, settleMsgCid, time.Second*10, "settle")
if res.Receipt.ExitCode != 0 {
t.Fatal("Unable to settle payment channel")
}
Expand Down
2 changes: 1 addition & 1 deletion cli/paych.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var paychCmd = &cli.Command{

var paychGetCmd = &cli.Command{
Name: "get",
Usage: "Create a new payment channel or get existing one",
Usage: "Create a new payment channel or get existing one and add amount to it",
ArgsUsage: "[fromAddress toAddress amount]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 3 {
Expand Down
1 change: 1 addition & 0 deletions gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {
err = gen.WriteMapEncodersToFile("./paychmgr/cbor_gen.go", "paychmgr",
paychmgr.VoucherInfo{},
paychmgr.ChannelInfo{},
paychmgr.MsgInfo{},
)
if err != nil {
fmt.Println(err)
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const (
HandleIncomingMessagesKey

RegisterClientValidatorKey
HandlePaymentChannelManagerKey

// miner
GetParamsKey
Expand Down Expand Up @@ -272,6 +273,7 @@ func Online() Option {

Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Override(new(*market.FundMgr), market.NewFundMgr),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
),
Expand Down
27 changes: 7 additions & 20 deletions node/impl/paych/paych.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type PaychAPI struct {
PaychMgr *paychmgr.Manager
}

func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, ensureFunds)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt)
if err != nil {
return nil, err
}
Expand All @@ -40,6 +40,10 @@ func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensur
}, nil
}

func (a *PaychAPI) PaychGetWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
return a.PaychMgr.GetPaychWaitReady(ctx, mcid)
}

func (a *PaychAPI) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return a.PaychMgr.AllocateLane(ch)
}
Expand Down Expand Up @@ -108,24 +112,7 @@ func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.P
}

func (a *PaychAPI) PaychSettle(ctx context.Context, addr address.Address) (cid.Cid, error) {

ci, err := a.PaychMgr.GetChannelInfo(addr)
if err != nil {
return cid.Undef, err
}

msg := &types.Message{
To: addr,
From: ci.Control,
Value: types.NewInt(0),
Method: builtin.MethodsPaych.Settle,
}
smgs, err := a.MpoolPushMessage(ctx, msg)

if err != nil {
return cid.Undef, err
}
return smgs.Cid(), nil
return a.PaychMgr.Settle(ctx, addr)
}

func (a *PaychAPI) PaychCollect(ctx context.Context, addr address.Address) (cid.Cid, error) {
Expand Down
67 changes: 67 additions & 0 deletions paychmgr/accessorcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package paychmgr

import "github.com/filecoin-project/go-address"

// accessorByFromTo gets a channel accessor for a given from / to pair.
// The channel accessor facilitates locking a channel so that operations
// must be performed sequentially on a channel (but can be performed at
// the same time on different channels).
func (pm *Manager) accessorByFromTo(from address.Address, to address.Address) (*channelAccessor, error) {
key := pm.accessorCacheKey(from, to)

// First take a read lock and check the cache
pm.lk.RLock()
ca, ok := pm.channels[key]
pm.lk.RUnlock()
if ok {
return ca, nil
}

// Not in cache, so take a write lock
pm.lk.Lock()
defer pm.lk.Unlock()

// Need to check cache again in case it was updated between releasing read
// lock and taking write lock
ca, ok = pm.channels[key]
if !ok {
// Not in cache, so create a new one and store in cache
ca = pm.addAccessorToCache(from, to)
}

return ca, nil
}

// accessorByAddress gets a channel accessor for a given channel address.
// The channel accessor facilitates locking a channel so that operations
// must be performed sequentially on a channel (but can be performed at
// the same time on different channels).
func (pm *Manager) accessorByAddress(ch address.Address) (*channelAccessor, error) {
// Get the channel from / to
pm.lk.RLock()
channelInfo, err := pm.store.ByAddress(ch)
pm.lk.RUnlock()
if err != nil {
return nil, err
}

// TODO: cache by channel address so we can get by address instead of using from / to
return pm.accessorByFromTo(channelInfo.Control, channelInfo.Target)
}

// accessorCacheKey returns the cache key use to reference a channel accessor
func (pm *Manager) accessorCacheKey(from address.Address, to address.Address) string {
return from.String() + "->" + to.String()
}

// addAccessorToCache adds a channel accessor to a cache. Note that channelInfo
// may be nil if the channel hasn't been created yet, but we still want to
// reference the same channel accessor for a given from/to, so that all
// attempts to access a channel use the same lock (the lock on the accessor)
func (pm *Manager) addAccessorToCache(from address.Address, to address.Address) *channelAccessor {
key := pm.accessorCacheKey(from, to)
ca := newChannelAccessor(pm)
// TODO: Use LRU
pm.channels[key] = ca
return ca
}
Loading

0 comments on commit 1e86e83

Please sign in to comment.