Skip to content

Commit

Permalink
Merge pull request #2838 from filecoin-project/feat/message-pool-sele…
Browse files Browse the repository at this point in the history
…ction

New message pool selection logic
  • Loading branch information
magik6k authored Aug 6, 2020
2 parents 85f4bc7 + 2166e9a commit e54a87f
Show file tree
Hide file tree
Showing 14 changed files with 964 additions and 726 deletions.
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type FullNode interface {
// MpoolPending returns pending mempool messages.
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)

// MpoolSelect returns a list of pending messages for inclusion in the next block
MpoolSelect(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)

// MpoolPush pushes a signed message to mempool.
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)

Expand Down
5 changes: 5 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type FullNodeStruct struct {
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`

MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
Expand Down Expand Up @@ -436,6 +437,10 @@ func (c *FullNodeStruct) GasEstimateGasLimit(ctx context.Context, msg *types.Mes
return c.Internal.GasEstimateGasLimit(ctx, msg, tsk)
}

func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolSelect(ctx, tsk)
}

func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, tsk)
}
Expand Down
3 changes: 3 additions & 0 deletions chain/messagepool/gasguess/guessgas.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types
const failedGasGuessRatio = 0.5
const failedGasGuessMax = 25_000_000

const MinGas = 1298450
const MaxGas = 1600271356

type CostKey struct {
Code cid.Cid
M abi.MethodNum
Expand Down
19 changes: 18 additions & 1 deletion chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/wallet"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -26,6 +27,7 @@ type testMpoolAPI struct {

bmsgs map[cid.Cid][]*types.SignedMessage
statenonce map[address.Address]uint64
balance map[address.Address]types.BigInt

tipsets []*types.TipSet
}
Expand All @@ -34,6 +36,7 @@ func newTestMpoolAPI() *testMpoolAPI {
return &testMpoolAPI{
bmsgs: make(map[cid.Cid][]*types.SignedMessage),
statenonce: make(map[address.Address]uint64),
balance: make(map[address.Address]types.BigInt),
}
}

Expand All @@ -55,6 +58,14 @@ func (tma *testMpoolAPI) setStateNonce(addr address.Address, v uint64) {
tma.statenonce[addr] = v
}

func (tma *testMpoolAPI) setBalance(addr address.Address, v uint64) {
tma.balance[addr] = types.FromFil(v)
}

func (tma *testMpoolAPI) setBalanceRaw(addr address.Address, v types.BigInt) {
tma.balance[addr] = v
}

func (tma *testMpoolAPI) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) {
tma.bmsgs[h.Cid()] = msgs
tma.tipsets = append(tma.tipsets, mock.TipSet(h))
Expand All @@ -74,9 +85,15 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error {
}

func (tma *testMpoolAPI) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
balance, ok := tma.balance[addr]
if !ok {
balance = types.NewInt(90000000)
tma.balance[addr] = balance
}
return &types.Actor{
Code: builtin.StorageMarketActorCodeID,
Nonce: tma.statenonce[addr],
Balance: types.NewInt(90000000),
Balance: balance,
}, nil
}

Expand Down
237 changes: 30 additions & 207 deletions chain/messagepool/pruning.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,15 @@
package messagepool

import (
"bytes"
"context"
big2 "math/big"
"sort"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
)

func (mp *MessagePool) pruneExcessMessages() error {

start := time.Now()
defer func() {
log.Infow("message pruning complete", "took", time.Since(start))
}()

mp.curTsLk.Lock()
ts := mp.curTs
mp.curTsLk.Unlock()
Expand All @@ -38,211 +24,48 @@ func (mp *MessagePool) pruneExcessMessages() error {
return mp.pruneMessages(context.TODO(), ts)
}

// just copied from miner/ SelectMessages
func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error {
al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return mp.api.StateGetActor(addr, ts)
}

msgs := make([]*types.SignedMessage, 0, mp.currentSize)
for a := range mp.pending {
msgs = append(msgs, mp.pendingFor(a)...)
}

type senderMeta struct {
lastReward abi.TokenAmount
lastGasLimit int64

gasReward []abi.TokenAmount
gasLimit []int64

msgs []*types.SignedMessage
}

inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]big.Int)
outBySender := make(map[address.Address]*senderMeta)

tooLowFundMsgs := 0
tooHighNonceMsgs := 0

start := build.Clock.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
guessGasDur := time.Duration(0)

sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
})

for _, msg := range msgs {
vmstart := build.Clock.Now()

minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
log.Warnf("invalid message in message pool: %s", err)
continue
}

vmValid += build.Clock.Since(vmstart)

// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
continue
}

if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}

from := msg.Message.From

getBalStart := build.Clock.Now()
if _, ok := inclNonces[from]; !ok {
act, err := mp.api.StateGetActor(from, nil)
if err != nil {
log.Warnf("failed to check message sender balance, skipping message: %+v", err)
continue
}

inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
getbal += build.Clock.Since(getBalStart)

if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
tooLowFundMsgs++
// todo: drop from mpool
continue
}

if msg.Message.Nonce > inclNonces[from] {
tooHighNonceMsgs++
continue
}

if msg.Message.Nonce < inclNonces[from] {
continue
}

inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
sm := outBySender[from]
if sm == nil {
sm = &senderMeta{
lastReward: big.Zero(),
}
}
start := time.Now()
defer func() {
log.Infof("message pruning took %s", time.Since(start))
}()

sm.gasLimit = append(sm.gasLimit, sm.lastGasLimit+msg.Message.GasLimit)
sm.lastGasLimit = sm.gasLimit[len(sm.gasLimit)-1]
pending, _ := mp.getPendingMessages(ts, ts)

guessGasStart := build.Clock.Now()
guessedGas, err := gasguess.GuessGasUsed(ctx, types.EmptyTSK, msg, al)
guessGasDur += build.Clock.Since(guessGasStart)
if err != nil {
log.Infow("failed to guess gas", "to", msg.Message.To, "method", msg.Message.Method, "err", err)
// Collect all messages to track which ones to remove and create chains for block inclusion
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
var chains []*msgChain
for actor, mset := range pending {
for _, m := range mset {
pruneMsgs[m.Message.Cid()] = m
}

estimatedReward := big.Mul(types.NewInt(uint64(guessedGas)), msg.Message.GasPrice)

sm.gasReward = append(sm.gasReward, big.Add(sm.lastReward, estimatedReward))
sm.lastReward = sm.gasReward[len(sm.gasReward)-1]

sm.msgs = append(sm.msgs, msg)

outBySender[from] = sm
actorChains := mp.createMessageChains(actor, mset, ts)
chains = append(chains, actorChains...)
}

orderedSenders := make([]address.Address, 0, len(outBySender))
for k := range outBySender {
orderedSenders = append(orderedSenders, k)
}
sort.Slice(orderedSenders, func(i, j int) bool {
return bytes.Compare(orderedSenders[i].Bytes(), orderedSenders[j].Bytes()) == -1
// Sort the chains
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})

out := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo)
{
for {
var bestSender address.Address
var nBest int
var bestGasToReward float64

// TODO: This is O(n^2)-ish, could use something like container/heap to cache this math
for _, sender := range orderedSenders {
meta, ok := outBySender[sender]
if !ok {
continue
}
for n := range meta.msgs {

if n+len(out) >= mp.maxTxPoolSizeLo {
break
}

gasToReward, _ := new(big2.Float).SetInt(meta.gasReward[n].Int).Float64()
gasToReward /= float64(meta.gasLimit[n])

if gasToReward >= bestGasToReward {
bestSender = sender
nBest = n + 1
bestGasToReward = gasToReward
}
}
}

if nBest == 0 {
break // block gas limit reached
}

{
out = append(out, outBySender[bestSender].msgs[:nBest]...)

outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:]
outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:]
outBySender[bestSender].gasReward = outBySender[bestSender].gasReward[nBest:]

if len(outBySender[bestSender].msgs) == 0 {
delete(outBySender, bestSender)
}
}

if len(out) >= mp.maxTxPoolSizeLo {
break
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
keepCount := 0
keepLoop:
for _, chain := range chains {
for _, m := range chain.msgs {
if keepCount < MemPoolSizeLimitLoDefault {
delete(pruneMsgs, m.Message.Cid())
keepCount++
} else {
break keepLoop
}
}
}

if tooLowFundMsgs > 0 {
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
}

if tooHighNonceMsgs > 0 {
log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs)
}

sm := build.Clock.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),
"vmvalidate", vmValid,
"getbalance", getbal,
"guessgas", guessGasDur,
"msgs", len(msgs))
}

good := make(map[cid.Cid]bool)
for _, m := range out {
good[m.Cid()] = true
}

for _, m := range msgs {
if !good[m.Cid()] {
mp.remove(m.Message.From, m.Message.Nonce)
}
// and remove all messages that are still in pruneMsgs after processing the chains
log.Infof("Pruning %d messages", len(pruneMsgs))
for _, m := range pruneMsgs {
mp.remove(m.Message.From, m.Message.Nonce)
}

return nil
Expand Down
Loading

0 comments on commit e54a87f

Please sign in to comment.