-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message Pool Rudimentary Spam Protection Measures #3313
Changes from all commits
da9630c
835dbfe
77f6e0d
4ac9828
4adb83e
b59f584
7887694
87e6c09
bedbdca
c473d3c
a2c0c10
7a70668
4919a00
8848c54
793eda9
8db262c
d3e95d6
ad889a7
6abfbbd
82ef052
038e83b
708a8b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"math" | ||
stdbig "math/big" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
@@ -47,19 +48,26 @@ const RbfDenom = 256 | |
|
||
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second | ||
|
||
var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) | ||
|
||
var MaxActorPendingMessages = 1000 | ||
|
||
var ( | ||
ErrMessageTooBig = errors.New("message too big") | ||
|
||
ErrMessageValueTooHigh = errors.New("cannot send more filecoin than will ever exist") | ||
|
||
ErrNonceTooLow = errors.New("message nonce too low") | ||
|
||
ErrGasFeeCapTooLow = errors.New("gas fee cap too low") | ||
|
||
ErrNotEnoughFunds = errors.New("not enough funds to execute transaction") | ||
|
||
ErrInvalidToAddr = errors.New("message had invalid to address") | ||
|
||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail") | ||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") | ||
ErrSoftValidationFailure = errors.New("validation failure") | ||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") | ||
ErrTooManyPendingMessages = errors.New("too many pending messages for actor") | ||
|
||
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again") | ||
) | ||
|
@@ -118,17 +126,19 @@ type MessagePool struct { | |
} | ||
|
||
type msgSet struct { | ||
msgs map[uint64]*types.SignedMessage | ||
nextNonce uint64 | ||
msgs map[uint64]*types.SignedMessage | ||
nextNonce uint64 | ||
requiredFunds *stdbig.Int | ||
} | ||
|
||
func newMsgSet() *msgSet { | ||
return &msgSet{ | ||
msgs: make(map[uint64]*types.SignedMessage), | ||
msgs: make(map[uint64]*types.SignedMessage), | ||
requiredFunds: stdbig.NewInt(0), | ||
} | ||
} | ||
|
||
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) { | ||
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool, error) { | ||
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce { | ||
ms.nextNonce = m.Message.Nonce + 1 | ||
} | ||
|
@@ -150,12 +160,44 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) { | |
ErrRBFTooLowPremium) | ||
} | ||
} | ||
|
||
ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int) | ||
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) | ||
} | ||
|
||
if !has && limit && len(ms.msgs) > MaxActorPendingMessages { | ||
log.Errorf("too many pending messages from actor %s", m.Message.From) | ||
return false, ErrTooManyPendingMessages | ||
} | ||
|
||
ms.msgs[m.Message.Nonce] = m | ||
ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int) | ||
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int) | ||
|
||
return !has, nil | ||
} | ||
|
||
func (ms *msgSet) rm(nonce uint64) { | ||
m, has := ms.msgs[nonce] | ||
if has { | ||
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int) | ||
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int) | ||
delete(ms.msgs, nonce) | ||
} | ||
} | ||
|
||
func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt { | ||
requiredFunds := new(stdbig.Int).Set(ms.requiredFunds) | ||
|
||
m, has := ms.msgs[nonce] | ||
if has { | ||
requiredFunds.Sub(requiredFunds, m.Message.RequiredFunds().Int) | ||
//requiredFunds.Sub(requiredFunds, m.Message.Value.Int) | ||
} | ||
|
||
return types.BigInt{Int: requiredFunds} | ||
} | ||
|
||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) { | ||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize) | ||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize) | ||
|
@@ -257,7 +299,7 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { | |
return nil | ||
} | ||
|
||
func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.ChainEpoch) error { | ||
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, epoch abi.ChainEpoch) error { | ||
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength()) | ||
|
||
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { | ||
|
@@ -278,25 +320,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { | |
<-mp.addSema | ||
}() | ||
|
||
mp.curTsLk.Lock() | ||
curTs := mp.curTs | ||
epoch := curTs.Height() | ||
mp.curTsLk.Unlock() | ||
if err := mp.verifyMsgBeforePush(m, epoch); err != nil { | ||
return cid.Undef, err | ||
} | ||
|
||
msgb, err := m.Serialize() | ||
if err != nil { | ||
return cid.Undef, err | ||
} | ||
|
||
mp.curTsLk.Lock() | ||
if mp.curTs != curTs { | ||
mp.curTsLk.Unlock() | ||
return cid.Undef, ErrTryAgain | ||
} | ||
|
||
if err := mp.addTs(m, mp.curTs); err != nil { | ||
mp.curTsLk.Unlock() | ||
return cid.Undef, err | ||
|
@@ -319,7 +348,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { | |
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) | ||
} | ||
|
||
// Perform syntaxtic validation, minGas=0 as we check if correctly in select messages | ||
// Perform syntactic validation, minGas=0 as we check the actual mingas before we add it | ||
if err := m.Message.ValidForBlockInclusion(0); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. were do we do that check now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do it in both add and push. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and we check with the actual gas in the add logic, so it happens in all paths. |
||
return xerrors.Errorf("message not valid for block inclusion: %w", err) | ||
} | ||
|
@@ -332,8 +361,12 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { | |
return ErrMessageValueTooHigh | ||
} | ||
|
||
if m.Message.GasFeeCap.LessThan(minimumBaseFee) { | ||
return ErrGasFeeCapTooLow | ||
} | ||
|
||
if err := mp.VerifyMsgSig(m); err != nil { | ||
log.Warnf("mpooladd signature verification failed: %s", err) | ||
log.Warnf("signature verification failed: %s", err) | ||
return err | ||
} | ||
|
||
|
@@ -393,48 +426,71 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error { | |
return nil | ||
} | ||
|
||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error { | ||
snonce, err := mp.getStateNonce(m.Message.From, curTs) | ||
func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error { | ||
balance, err := mp.getStateBalance(m.Message.From, curTs) | ||
if err != nil { | ||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway) | ||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure) | ||
} | ||
|
||
if snonce > m.Message.Nonce { | ||
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) | ||
requiredFunds := m.Message.RequiredFunds() | ||
if balance.LessThan(requiredFunds) { | ||
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrNotEnoughFunds) | ||
} | ||
|
||
balance, err := mp.getStateBalance(m.Message.From, curTs) | ||
// add Value for soft failure check | ||
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value) | ||
|
||
mset, ok := mp.pending[m.Message.From] | ||
if ok { | ||
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce)) | ||
} | ||
|
||
if balance.LessThan(requiredFunds) { | ||
// Note: we fail here for ErrSoftValidationFailure to signal a soft failure because we might | ||
// be out of sync. | ||
return xerrors.Errorf("not enough funds including pending messages (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrSoftValidationFailure) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error { | ||
snonce, err := mp.getStateNonce(m.Message.From, curTs) | ||
if err != nil { | ||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrBroadcastAnyway) | ||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) | ||
} | ||
|
||
if balance.LessThan(m.Message.RequiredFunds()) { | ||
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(m.Message.RequiredFunds()), types.FIL(balance), ErrNotEnoughFunds) | ||
if snonce > m.Message.Nonce { | ||
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) | ||
} | ||
|
||
mp.lk.Lock() | ||
defer mp.lk.Unlock() | ||
|
||
return mp.addLocked(m) | ||
if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil { | ||
return err | ||
} | ||
|
||
if err := mp.checkBalance(m, curTs); err != nil { | ||
return err | ||
} | ||
|
||
return mp.addLocked(m, true) | ||
} | ||
|
||
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { | ||
mp.lk.Lock() | ||
defer mp.lk.Unlock() | ||
|
||
return mp.addLocked(m) | ||
return mp.addLocked(m, false) | ||
} | ||
|
||
func (mp *MessagePool) addLocked(m *types.SignedMessage) error { | ||
func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error { | ||
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) | ||
if m.Signature.Type == crypto.SigTypeBLS { | ||
mp.blsSigCache.Add(m.Cid(), m.Signature) | ||
} | ||
|
||
if m.Message.GasLimit > build.BlockGasLimit { | ||
return xerrors.Errorf("given message has too high of a gas limit") | ||
} | ||
|
||
Comment on lines
-434
to
-437
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is checked by |
||
if _, err := mp.api.PutMessage(m); err != nil { | ||
log.Warnf("mpooladd cs.PutMessage failed: %s", err) | ||
return err | ||
|
@@ -451,7 +507,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { | |
mp.pending[m.Message.From] = mset | ||
} | ||
|
||
incr, err := mset.add(m, mp) | ||
incr, err := mset.add(m, mp, limit) | ||
if err != nil { | ||
log.Info(err) | ||
return err | ||
|
@@ -562,6 +618,16 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, | |
return nil, err | ||
} | ||
|
||
err = mp.checkMessage(msg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
msgb, err := msg.Serialize() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// reacquire the locks and check state for consistency | ||
mp.curTsLk.Lock() | ||
defer mp.curTsLk.Unlock() | ||
|
@@ -582,16 +648,15 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, | |
return nil, ErrTryAgain | ||
} | ||
|
||
if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil { | ||
if err := mp.verifyMsgBeforeAdd(msg, curTs.Height()); err != nil { | ||
return nil, err | ||
} | ||
|
||
msgb, err := msg.Serialize() | ||
if err != nil { | ||
if err := mp.checkBalance(msg, curTs); err != nil { | ||
return nil, err | ||
} | ||
|
||
if err := mp.addLocked(msg); err != nil { | ||
if err := mp.addLocked(msg, true); err != nil { | ||
return nil, xerrors.Errorf("add locked failed: %w", err) | ||
} | ||
if err := mp.addLocal(msg, msgb); err != nil { | ||
|
@@ -625,7 +690,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) { | |
|
||
// NB: This deletes any message with the given nonce. This makes sense | ||
// as two messages with the same sender cannot have the same nonce | ||
delete(mset.msgs, nonce) | ||
mset.rm(nonce) | ||
|
||
if len(mset.msgs) == 0 { | ||
delete(mp.pending, from) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be
Add
no?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, we're tracking funds required for this message chain here, so this means we're removing the message from the chain, which lowers the required funds for that chain. SGTM