Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: don't spend too much time on transaction inclusion #25524

Merged
merged 10 commits into from
Aug 19, 2022
6 changes: 5 additions & 1 deletion cmd/devp2p/internal/ethtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
}
return nil

// ignore tx announcements from previous tests
case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
continue
case *Transactions:
continue

default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket))
}
return

// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue

// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
6 changes: 3 additions & 3 deletions cmd/devp2p/internal/ethtest/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/ethereum/go-ethereum/params"
)

//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")

func (s *Suite) sendSuccessfulTxs(t *utesting.T) error {
Expand Down Expand Up @@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
nonce = txs[len(txs)-1].Nonce()

// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within 3 announcements.
// all txs should be announced within a couple announcements.
recvHashes := make([]common.Hash, 0)

for i := 0; i < 3; i++ {
for i := 0; i < 20; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions:
for _, tx := range *msg {
Expand Down
98 changes: 60 additions & 38 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,57 +262,79 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
// Keep track of all the propagated transactions
if direct {
txReplyInMeter.Mark(int64(len(txs)))
} else {
txBroadcastInMeter.Mark(int64(len(txs)))
var (
inMeter = txReplyInMeter
knownMeter = txReplyKnownMeter
underpricedMeter = txReplyUnderpricedMeter
otherRejectMeter = txReplyOtherRejectMeter
)
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
}
// Keep track of all the propagated transactions
inMeter.Mark(int64(len(txs)))

// Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
duplicate int64
underpriced int64
otherreject int64
added = make([]common.Hash, 0, len(txs))
delay time.Duration
)
errs := f.addTxs(txs)
for i, err := range errs {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(txs[i].Hash())
// proceed in batches
for i := 0; i < len(txs); i += 128 {
end := i + 128
if end > len(txs) {
end = len(txs)
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
var (
duplicate int64
underpriced int64
otherreject int64
)
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these

case errors.Is(err, core.ErrAlreadyKnown):
duplicate++
case errors.Is(err, core.ErrAlreadyKnown):
duplicate++

case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
underpriced++
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
underpriced++

default:
otherreject++
default:
otherreject++
}
added = append(added, batch[j].Hash())
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
otherRejectMeter.Mark(otherreject)

// If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
holiman marked this conversation as resolved.
Show resolved Hide resolved
// out of sync with the chain or the peer is griefing us.
if otherreject > 128/4 {
delay = 200 * time.Millisecond
holiman marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end)
break
}
added = append(added, txs[i].Hash())
}
if direct {
txReplyKnownMeter.Mark(duplicate)
txReplyUnderpricedMeter.Mark(underpriced)
txReplyOtherRejectMeter.Mark(otherreject)
} else {
txBroadcastKnownMeter.Mark(duplicate)
txBroadcastUnderpricedMeter.Mark(underpriced)
txBroadcastOtherRejectMeter.Mark(otherreject)
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
time.Sleep(delay)
holiman marked this conversation as resolved.
Show resolved Hide resolved
return nil
case <-f.quit:
return errTerminated
Expand Down