diff --git a/chain/consensus/filcns/mine.go b/chain/consensus/filcns/mine.go index bbda35fcfd3..757363a76c2 100644 --- a/chain/consensus/filcns/mine.go +++ b/chain/consensus/filcns/mine.go @@ -65,7 +65,7 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api. } blsMsgCids = append(blsMsgCids, c) - } else { + } else if msg.Signature.Type == crypto.SigTypeSecp256k1 { c, err := filec.sm.ChainStore().PutMessage(msg) if err != nil { return nil, err @@ -74,6 +74,8 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api. secpkMsgCids = append(secpkMsgCids, c) secpkMessages = append(secpkMessages, msg) + } else { + return nil, xerrors.Errorf("unknown sig type: %d", msg.Signature.Type) } } diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index 4323bdee197..d92b5bd5855 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -121,7 +121,7 @@ loop: // we can't fit the current chain but there is gas to spare // trim it and push it down - chain.Trim(gasLimit, mp, baseFee) + chain.Trim(gasLimit, repubMsgLimit, mp, baseFee) for j := i; j < len(chains)-1; j++ { if chains[j].Before(chains[j+1]) { break @@ -131,6 +131,10 @@ loop: } count := 0 + if len(msgs) > repubMsgLimit { + msgs = msgs[:repubMsgLimit] + } + log.Infof("republishing %d messages", len(msgs)) for _, m := range msgs { mb, err := m.Serialize() diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index acff7c4cff9..3ccf2e40619 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -7,6 +7,10 @@ import ( "sort" "time" + cbg "github.com/whyrusleeping/cbor-gen" + + "github.com/filecoin-project/go-state-types/crypto" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -34,9 +38,10 @@ type msgChain struct { merged bool next *msgChain prev *msgChain + sigType crypto.SigType } -func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { +func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -46,24 +51,156 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq // if the ticket quality is high enough that the first block has higher probability // than any other block, then we don't bother with optimal selection because the // first block will always have higher effective performance + var sm *selectedMessages + var err error if tq > 0.84 { - msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) + sm, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) } else { - msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) + sm, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) } if err != nil { return nil, err } - if len(msgs) > build.BlockMessageLimit { - msgs = msgs[:build.BlockMessageLimit] + if sm == nil { + return nil, nil + } + + // one last sanity check + if len(sm.msgs) > build.BlockMessageLimit { + log.Errorf("message selection chose too many messages %d > %d", len(sm.msgs), build.BlockMessageLimit) + sm.msgs = sm.msgs[:build.BlockMessageLimit] + } + + return sm.msgs, nil +} + +type selectedMessages struct { + msgs []*types.SignedMessage + gasLimit int64 + secpLimit int + blsLimit int +} + +// returns false if chain can't be added due to block constraints +func (sm *selectedMessages) tryToAdd(mc *msgChain) bool { + l := len(mc.msgs) + + if build.BlockMessageLimit < l+len(sm.msgs) || sm.gasLimit < mc.gasLimit { + return false } - return msgs, nil + if mc.sigType == crypto.SigTypeBLS { + if sm.blsLimit < l { + return false + } + + sm.msgs = append(sm.msgs, mc.msgs...) + sm.blsLimit -= l + sm.gasLimit -= mc.gasLimit + } else if mc.sigType == crypto.SigTypeSecp256k1 { + if sm.secpLimit < l { + return false + } + + sm.msgs = append(sm.msgs, mc.msgs...) + sm.secpLimit -= l + sm.gasLimit -= mc.gasLimit + } + + // don't add the weird sigType msg, but otherwise proceed + return true } -func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { +// returns false if messages can't be added due to block constraints +// will trim / invalidate chain as appropriate +func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, baseFee types.BigInt) bool { + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := mc.gasLimit + chainMsgLimit := len(mc.msgs) + depGasLimit := int64(0) + depMsgLimit := 0 + smMsgLimit := 0 + + if mc.sigType == crypto.SigTypeBLS { + smMsgLimit = sm.blsLimit + } else if mc.sigType == crypto.SigTypeSecp256k1 { + smMsgLimit = sm.secpLimit + } else { + return false + } + + if smMsgLimit > build.BlockMessageLimit-len(sm.msgs) { + smMsgLimit = build.BlockMessageLimit - len(sm.msgs) + } + + var chainDeps []*msgChain + for curChain := mc.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + chainMsgLimit += len(curChain.msgs) + depGasLimit += curChain.gasLimit + depMsgLimit += len(curChain.msgs) + } + + // the chain doesn't fit as-is, so trim / invalidate it and return false + if chainGasLimit > sm.gasLimit || chainMsgLimit > smMsgLimit { + + // it doesn't all fit; now we have to take into account the dependent chains before + // making a decision about trimming or invalidating. + // if the dependencies exceed the block limits, then we must invalidate the chain + // as it can never be included. + // Otherwise we can just trim and continue + if depGasLimit > sm.gasLimit || depMsgLimit >= smMsgLimit { + mc.Invalidate() + } else { + // dependencies fit, just trim it + mc.Trim(sm.gasLimit-depGasLimit, smMsgLimit-depMsgLimit, mp, baseFee) + } + + return false + } + + // the chain fits! include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + sm.msgs = append(sm.msgs, curChain.msgs...) + } + + mc.merged = true + + sm.msgs = append(sm.msgs, mc.msgs...) + sm.gasLimit -= chainGasLimit + + if mc.sigType == crypto.SigTypeBLS { + sm.blsLimit -= chainMsgLimit + } else if mc.sigType == crypto.SigTypeSecp256k1 { + sm.secpLimit -= chainMsgLimit + } + + return true +} + +func (sm *selectedMessages) trimChain(mc *msgChain, mp *MessagePool, baseFee types.BigInt) { + msgLimit := build.BlockMessageLimit - len(sm.msgs) + if mc.sigType == crypto.SigTypeBLS { + if msgLimit > sm.blsLimit { + msgLimit = sm.blsLimit + } + } else if mc.sigType == crypto.SigTypeSecp256k1 { + if msgLimit > sm.secpLimit { + msgLimit = sm.secpLimit + } + } + + if mc.gasLimit > sm.gasLimit || len(mc.msgs) > msgLimit { + mc.Trim(sm.gasLimit, msgLimit, mp, baseFee) + } +} + +func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) (*selectedMessages, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -89,10 +226,10 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) + result := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? - if gasLimit < minGas { + if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit { return result, nil } @@ -117,19 +254,21 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ return result, nil } - // 3. Parition chains into blocks (without trimming) + // 3. Partition chains into blocks (without trimming) // we use the full blockGasLimit (as opposed to the residual gas limit from the - // priority message selection) as we have to account for what other miners are doing + // priority message selection) as we have to account for what other block providers are doing nextChain := 0 partitions := make([][]*msgChain, MaxBlocks) for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { gasLimit := int64(build.BlockGasLimit) + msgLimit := build.BlockMessageLimit for nextChain < len(chains) { chain := chains[nextChain] nextChain++ partitions[i] = append(partitions[i], chain) gasLimit -= chain.gasLimit - if gasLimit < minGas { + msgLimit -= len(chain.msgs) + if gasLimit < minGas || msgLimit <= 0 { break } } @@ -158,7 +297,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ }) // 6. Merge the head chains to produce the list of messages selected for inclusion - // subject to the residual gas limit + // subject to the residual block limits // When a chain is merged in, all its previous dependent chains *must* also be // merged in or we'll have a broken block startMerge := time.Now() @@ -174,35 +313,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ continue } - // compute the dependencies that must be merged and the gas limit including deps - chainGasLimit := chain.gasLimit - var chainDeps []*msgChain - for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { - chainDeps = append(chainDeps, curChain) - chainGasLimit += curChain.gasLimit - } - - // does it all fit in the block? - if chainGasLimit <= gasLimit { - // include it together with all dependencies - for i := len(chainDeps) - 1; i >= 0; i-- { - curChain := chainDeps[i] - curChain.merged = true - result = append(result, curChain.msgs...) - } - - chain.merged = true - // adjust the effective pefromance for all subsequent chains + if result.tryToAddWithDeps(chain, mp, baseFee) { + // adjust the effective performance for all subsequent chains if next := chain.next; next != nil && next.effPerf > 0 { next.effPerf += next.parentOffset for next = next.next; next != nil && next.effPerf > 0; next = next.next { next.setEffPerf() } } - result = append(result, chain.msgs...) - gasLimit -= chainGasLimit - // resort to account for already merged chains and effective performance adjustments + // re-sort to account for already merged chains and effective performance adjustments // the sort *must* be stable or we end up getting negative gasPerfs pushed up. sort.SliceStable(chains[i+1:], func(i, j int) bool { return chains[i].BeforeEffective(chains[j]) @@ -211,7 +331,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ continue } - // we can't fit this chain and its dependencies because of block gasLimit -- we are + // we can't fit this chain and its dependencies because of block limits -- we are // at the edge last = i break @@ -222,18 +342,22 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ // 7. We have reached the edge of what can fit wholesale; if we still hae available // gasLimit to pack some more chains, then trim the last chain and push it down. - // Trimming invalidaates subsequent dependent chains so that they can't be selected + // Trimming invalidates subsequent dependent chains so that they can't be selected // as their dependency cannot be (fully) included. // We do this in a loop because the blocker might have been inordinately large and // we might have to do it multiple times to satisfy tail packing startTail := time.Now() tailLoop: - for gasLimit >= minGas && last < len(chains) { - // trim if necessary - if chains[last].gasLimit > gasLimit { - chains[last].Trim(gasLimit, mp, baseFee) + for result.gasLimit >= minGas && last < len(chains) { + + if !chains[last].valid { + last++ + continue tailLoop } + // trim if necessary + result.trimChain(chains[last], mp, baseFee) + // push down if it hasn't been invalidated if chains[last].valid { for i := last; i < len(chains)-1; i++ { @@ -245,7 +369,7 @@ tailLoop: } // select the next (valid and fitting) chain and its dependencies for inclusion - for i, chain := range chains[last:] { + for _, chain := range chains[last:] { // has the chain been invalidated? if !chain.valid { continue @@ -261,45 +385,10 @@ tailLoop: break tailLoop } - // compute the dependencies that must be merged and the gas limit including deps - chainGasLimit := chain.gasLimit - depGasLimit := int64(0) - var chainDeps []*msgChain - for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { - chainDeps = append(chainDeps, curChain) - chainGasLimit += curChain.gasLimit - depGasLimit += curChain.gasLimit - } - - // does it all fit in the bock - if chainGasLimit <= gasLimit { - // include it together with all dependencies - for i := len(chainDeps) - 1; i >= 0; i-- { - curChain := chainDeps[i] - curChain.merged = true - result = append(result, curChain.msgs...) - } - - chain.merged = true - result = append(result, chain.msgs...) - gasLimit -= chainGasLimit + if result.tryToAddWithDeps(chain, mp, baseFee) { continue } - // it doesn't all fit; now we have to take into account the dependent chains before - // making a decision about trimming or invalidating. - // if the dependencies exceed the gas limit, then we must invalidate the chain - // as it can never be included. - // Otherwise we can just trim and continue - if depGasLimit > gasLimit { - chain.Invalidate() - last += i + 1 - continue tailLoop - } - - // dependencies fit, just trim it - chain.Trim(gasLimit-depGasLimit, mp, baseFee) - last += i continue tailLoop } @@ -311,17 +400,17 @@ tailLoop: log.Infow("pack tail chains done", "took", dt) } - // if we have gasLimit to spare, pick some random (non-negative) chains to fill the block - // we pick randomly so that we minimize the probability of duplication among all miners - if gasLimit >= minGas { - randomCount := 0 + // if we have room to spare, pick some random (non-negative) chains to fill the block + // we pick randomly so that we minimize the probability of duplication among all block producers + if result.gasLimit >= minGas && len(result.msgs) <= build.BlockMessageLimit { + preRandomLength := len(result.msgs) startRandom := time.Now() shuffleChains(chains) for _, chain := range chains { // have we filled the block - if gasLimit < minGas { + if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit { break } @@ -335,59 +424,31 @@ tailLoop: continue } - // compute the dependencies that must be merged and the gas limit including deps - chainGasLimit := chain.gasLimit - depGasLimit := int64(0) - var chainDeps []*msgChain - for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { - chainDeps = append(chainDeps, curChain) - chainGasLimit += curChain.gasLimit - depGasLimit += curChain.gasLimit - } - - // do the deps fit? if the deps won't fit, invalidate the chain - if depGasLimit > gasLimit { - chain.Invalidate() + if result.tryToAddWithDeps(chain, mp, baseFee) { continue } - // do they fit as is? if it doesn't, trim to make it fit if possible - if chainGasLimit > gasLimit { - chain.Trim(gasLimit-depGasLimit, mp, baseFee) - - if !chain.valid { - continue - } - } - - // include it together with all dependencies - for i := len(chainDeps) - 1; i >= 0; i-- { - curChain := chainDeps[i] - curChain.merged = true - result = append(result, curChain.msgs...) - randomCount += len(curChain.msgs) + if chain.valid { + // chain got trimmed on the previous call to tryToAddWithDeps, can now be included + result.tryToAddWithDeps(chain, mp, baseFee) + continue } - - chain.merged = true - result = append(result, chain.msgs...) - randomCount += len(chain.msgs) - gasLimit -= chainGasLimit } if dt := time.Since(startRandom); dt > time.Millisecond { log.Infow("pack random tail chains done", "took", dt) } - if randomCount > 0 { + if len(result.msgs) != preRandomLength { log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection", - randomCount) + len(result.msgs)-preRandomLength) } } return result, nil } -func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) (*selectedMessages, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -413,10 +474,10 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) + result := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? - if gasLimit < minGas { + if result.gasLimit < minGas || len(result.msgs) > build.BlockMessageLimit { return result, nil } @@ -442,7 +503,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type } // 3. Merge the head chains to produce the list of messages selected for inclusion, subject to - // the block gas limit. + // the block gas and message limits. startMerge := time.Now() last := len(chains) for i, chain := range chains { @@ -452,13 +513,12 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type } // does it fit in the block? - if chain.gasLimit <= gasLimit { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) + if result.tryToAdd(chain) { + // there was room, we added the chain, keep going continue } - // we can't fit this chain because of block gasLimit -- we are at the edge + // we can't fit this chain because of block limits -- we are at the edge last = i break } @@ -474,9 +534,9 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type // have to do it multiple times to satisfy tail packing. startTail := time.Now() tailLoop: - for gasLimit >= minGas && last < len(chains) { + for result.gasLimit >= minGas && last < len(chains) { // trim - chains[last].Trim(gasLimit, mp, baseFee) + result.trimChain(chains[last], mp, baseFee) // push down if it hasn't been invalidated if chains[last].valid { @@ -501,9 +561,8 @@ tailLoop: } // does it fit in the bock? - if chain.gasLimit <= gasLimit { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) + if result.tryToAdd(chain) { + // there was room, we added the chain, keep going continue } @@ -523,7 +582,7 @@ tailLoop: return result, nil } -func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { +func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *selectedMessages { start := time.Now() defer func() { if dt := time.Since(start); dt > time.Millisecond { @@ -531,8 +590,12 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a } }() mpCfg := mp.getConfig() - result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow) - gasLimit := int64(build.BlockGasLimit) + result := &selectedMessages{ + msgs: make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow), + gasLimit: int64(build.BlockGasLimit), + blsLimit: cbg.MaxLength, + secpLimit: cbg.MaxLength, + } minGas := int64(gasguess.MinGas) // 1. Get priority actor chains @@ -542,7 +605,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a pk, err := mp.resolveToKey(ctx, actor) if err != nil { log.Debugf("mpooladdlocal failed to resolve sender: %s", err) - return nil, gasLimit + return result } mset, ok := pending[pk] @@ -554,9 +617,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a chains = append(chains, next...) } } - if len(chains) == 0 { - return nil, gasLimit + return result } // 2. Sort the chains @@ -566,7 +628,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a if len(chains) != 0 && chains[0].gasPerf < 0 { log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf) - return nil, gasLimit + return result } // 3. Merge chains until the block limit, as long as they have non-negative gas performance @@ -576,9 +638,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a break } - if chain.gasLimit <= gasLimit { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) + if result.tryToAdd(chain) { + // there was room, we added the chain, keep going continue } @@ -588,9 +649,10 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a } tailLoop: - for gasLimit >= minGas && last < len(chains) { + for result.gasLimit >= minGas && last < len(chains) { // trim, discarding negative performing messages - chains[last].Trim(gasLimit, mp, baseFee) + + result.trimChain(chains[last], mp, baseFee) // push down if it hasn't been invalidated if chains[last].valid { @@ -615,9 +677,8 @@ tailLoop: } // does it fit in the bock? - if chain.gasLimit <= gasLimit { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) + if result.tryToAdd(chain) { + // there was room, we added the chain, keep going continue } @@ -631,7 +692,7 @@ tailLoop: break } - return result, gasLimit + return result } func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) { @@ -778,11 +839,16 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 return nil } + // if we have more messages from this sender than can fit in a block, drop the extra ones + if len(msgs) > build.BlockMessageLimit { + msgs = msgs[:build.BlockMessageLimit] + } + // ok, now we can construct the chains using the messages we have // invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged // and increase the gasPerf of the first chain // We do this in two passes: - // - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf + // - in the first pass we create chains that aggregate messages with non-decreasing gasPerf // - in the second pass we merge chains to maintain the invariant. var chains []*msgChain var curChain *msgChain @@ -794,6 +860,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 chain.gasLimit = m.Message.GasLimit chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit) chain.valid = true + chain.sigType = m.Signature.Type return chain } @@ -808,7 +875,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 gasLimit := curChain.gasLimit + m.Message.GasLimit gasPerf := mp.getGasPerf(gasReward, gasLimit) - // try to add the message to the current chain -- if it decreases the gasPerf, then make a + // try to add the message to the current chain -- if it decreases the gasPerf, or then make a // new chain if gasPerf < curChain.gasPerf { chains = append(chains, curChain) @@ -868,9 +935,9 @@ func (mc *msgChain) Before(other *msgChain) bool { (mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) } -func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) { +func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee types.BigInt) { i := len(mc.msgs) - 1 - for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) { + for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0 || i >= msgLimit) { gasReward := mp.getGasReward(mc.msgs[i], baseFee) mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasLimit -= mc.msgs[i].Message.GasLimit @@ -893,6 +960,7 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) mc.msgs = mc.msgs[:i+1] } + // TODO: if the trim above is a no-op, this (may) needlessly invalidates the next chain if mc.next != nil { mc.next.Invalidate() mc.next = nil diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 0f8fd8ee6f2..f3389896f6d 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -13,6 +13,10 @@ import ( "sort" "testing" + "github.com/filecoin-project/go-state-types/crypto" + + cbg "github.com/whyrusleeping/cbor-gen" + "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -527,7 +531,7 @@ func TestBasicMessageSelection(t *testing.T) { } } -func TestMessageSelectionTrimming(t *testing.T) { +func TestMessageSelectionTrimmingGas(t *testing.T) { mp, tma := makeTestMpool() // the actors @@ -577,17 +581,210 @@ func TestMessageSelectionTrimming(t *testing.T) { expected := int(build.BlockGasLimit / gasLimit) if len(msgs) != expected { - t.Fatalf("expected %d messages, bug got %d", expected, len(msgs)) + t.Fatalf("expected %d messages, but got %d", expected, len(msgs)) + } + + mGasLimit := int64(0) + for _, m := range msgs { + mGasLimit += m.Message.GasLimit + } + if mGasLimit > build.BlockGasLimit { + t.Fatal("selected messages gas limit exceeds block gas limit!") + } + +} + +func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := tma.nextBlock() + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + tma.setBalance(a1, 1) // in FIL + + // create a larger than selectable chain + for i := 0; i < build.BlockMessageLimit; i++ { + m := makeTestMessage(w1, a1, a1, uint64(i), 300000, 100) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) + if err != nil { + t.Fatal(err) + } + + expected := cbg.MaxLength + if len(msgs) != expected { + t.Fatalf("expected %d messages, but got %d", expected, len(msgs)) + } + + mGasLimit := int64(0) + for _, m := range msgs { + mGasLimit += m.Message.GasLimit + } + if mGasLimit > build.BlockGasLimit { + t.Fatal("selected messages gas limit exceeds block gas limit!") + } + +} + +func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.WalletNew(context.Background(), types.KTBLS) + if err != nil { + t.Fatal(err) + } + + block := tma.nextBlock() + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + // create 2 larger than selectable chains + for i := 0; i < build.BlockMessageLimit; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 100) + mustAdd(t, mp, m) + // a2's messages are preferred + m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 1000) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) + if err != nil { + t.Fatal(err) + } + + mGasLimit := int64(0) + counts := make(map[crypto.SigType]uint) + for _, m := range msgs { + mGasLimit += m.Message.GasLimit + counts[m.Signature.Type]++ + } + + if mGasLimit > build.BlockGasLimit { + t.Fatal("selected messages gas limit exceeds block gas limit!") + } + + expected := build.BlockMessageLimit + if len(msgs) != expected { + t.Fatalf("expected %d messages, but got %d", expected, len(msgs)) + } + + if counts[crypto.SigTypeBLS] != cbg.MaxLength { + t.Fatalf("expected %d bls messages, but got %d", cbg.MaxLength, len(msgs)) + } +} + +func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.WalletNew(context.Background(), types.KTBLS) + if err != nil { + t.Fatal(err) + } + + block := tma.nextBlock() + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + // create 2 almost max-length chains of equal value + i := 0 + for i = 0; i < cbg.MaxLength-1; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 100) + mustAdd(t, mp, m) + // a2's messages are preferred + m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 100) + mustAdd(t, mp, m) + } + + // a1's 8192th message is worth more than a2's + m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 1000) + mustAdd(t, mp, m) + + m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 100) + mustAdd(t, mp, m) + + i++ + + // a2's (unselectable) 8193rd message is worth SO MUCH + m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 1000000) + mustAdd(t, mp, m) + + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) + if err != nil { + t.Fatal(err) } mGasLimit := int64(0) + counts := make(map[crypto.SigType]uint) for _, m := range msgs { mGasLimit += m.Message.GasLimit + counts[m.Signature.Type]++ } + if mGasLimit > build.BlockGasLimit { t.Fatal("selected messages gas limit exceeds block gas limit!") } + expected := build.BlockMessageLimit + if len(msgs) != expected { + t.Fatalf("expected %d messages, but got %d", expected, len(msgs)) + } + + // we should have taken the secp chain + if counts[crypto.SigTypeSecp256k1] != cbg.MaxLength { + t.Fatalf("expected %d bls messages, but got %d", cbg.MaxLength, len(msgs)) + } } func TestPriorityMessageSelection(t *testing.T) { @@ -978,7 +1175,7 @@ func TestOptimalMessageSelection2(t *testing.T) { func TestOptimalMessageSelection3(t *testing.T) { // this test uses 10 actors sending a block of messages to each other, with the the first // actors paying higher gas premium than the subsequent actors. - // We select with a low ticket quality; the chain depenent merging algorithm should pick + // We select with a low ticket quality; the chain dependent merging algorithm should pick // messages from the median actor from the start mp, tma := makeTestMpool() @@ -1109,11 +1306,13 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu logging.SetLogLevel("messagepool", "error") // 1. greedy selection - greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts) + gm, err := mp.selectMessagesGreedy(context.Background(), ts, ts) if err != nil { t.Fatal(err) } + greedyMsgs := gm.msgs + totalGreedyCapacity := 0.0 totalGreedyReward := 0.0 totalOptimalCapacity := 0.0