diff --git a/chain.json b/chain.json index 20a6c96b20..1a9c2111e5 100644 --- a/chain.json +++ b/chain.json @@ -5,12 +5,20 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 500, "priority-regossip-txs-per-address": 200, - "priority-regossip-addresses": ["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC", "0x8db97C7cEcE249c2b98bDC0226Cc4C2A57BF52FC"], + "priority-regossip-addresses": [ + "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", + "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC", + "0x8db97C7cEcE249c2b98bDC0226Cc4C2A57BF52FC" + ], "validator-private-key-file": "/tmp/validator.pk", "is-validator": true, "trading-api-enabled": true, "testing-api-enabled": true, "load-from-snapshot-enabled": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/aylin/chain.json b/network-configs/aylin/chain.json index a01a49286f..cea495e71d 100644 --- a/network-configs/aylin/chain.json +++ b/network-configs/aylin/chain.json @@ -5,10 +5,15 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F"], + "priority-regossip-addresses": [ + "0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F" + ], "validator-private-key-file": "/home/ubuntu/validator.pk", "feeRecipient": "", "is-validator": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/aylin/chain_api_node.json b/network-configs/aylin/chain_api_node.json index 708c0556d3..5eb6dae9e4 100644 --- a/network-configs/aylin/chain_api_node.json +++ b/network-configs/aylin/chain_api_node.json @@ -5,11 +5,28 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F"], + "priority-regossip-addresses": [ + "0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F" + ], "coreth-admin-api-enabled": true, - "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], + "eth-apis": [ + "public-eth", + "public-eth-filter", + "net", + "web3", + "internal-eth", + "internal-blockchain", + "internal-transaction", + "internal-debug", + "internal-tx-pool", + "internal-account", + "debug-tracer" + ], "trading-api-enabled": true, "testing-api-enabled": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/aylin/chain_archival_node.json b/network-configs/aylin/chain_archival_node.json index 2d43af04de..8583aa3213 100644 --- a/network-configs/aylin/chain_archival_node.json +++ b/network-configs/aylin/chain_archival_node.json @@ -6,11 +6,28 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F"], + "priority-regossip-addresses": [ + "0x06CCAD927e6B1d36E219Cb582Af3185D0705f78F" + ], "coreth-admin-api-enabled": true, - "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], + "eth-apis": [ + "public-eth", + "public-eth-filter", + "net", + "web3", + "internal-public-eth", + "internal-blockchain", + "internal-transaction", + "internal-debug", + "internal-tx-pool", + "internal-account", + "debug-tracer" + ], "trading-api-enabled": true, "testing-api-enabled": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/aylin/t2WSjSsoE3geV9ARu5r7gzTc5UayePy3NxDrSTx7hadLYvqbg.json b/network-configs/aylin/t2WSjSsoE3geV9ARu5r7gzTc5UayePy3NxDrSTx7hadLYvqbg.json index b3cf39918f..22901ef07b 100644 --- a/network-configs/aylin/t2WSjSsoE3geV9ARu5r7gzTc5UayePy3NxDrSTx7hadLYvqbg.json +++ b/network-configs/aylin/t2WSjSsoE3geV9ARu5r7gzTc5UayePy3NxDrSTx7hadLYvqbg.json @@ -1,6 +1,3 @@ -{ - "proposerMinBlockDelay": 0, - "appGossipValidatorSize": 10, - "appGossipNonValidatorSize": 5, - "appGossipPeerSize": 15 -} +{ + "proposerMinBlockDelay": 0 +} diff --git a/network-configs/hubblenet/2mxZY7A2t1tuRMALW4BcBUPVGNR3LH1DXhftdbLAHm1QtDkFp8.json b/network-configs/hubblenet/2mxZY7A2t1tuRMALW4BcBUPVGNR3LH1DXhftdbLAHm1QtDkFp8.json index b3cf39918f..1e0ba71454 100644 --- a/network-configs/hubblenet/2mxZY7A2t1tuRMALW4BcBUPVGNR3LH1DXhftdbLAHm1QtDkFp8.json +++ b/network-configs/hubblenet/2mxZY7A2t1tuRMALW4BcBUPVGNR3LH1DXhftdbLAHm1QtDkFp8.json @@ -1,6 +1,3 @@ { - "proposerMinBlockDelay": 0, - "appGossipValidatorSize": 10, - "appGossipNonValidatorSize": 5, - "appGossipPeerSize": 15 + "proposerMinBlockDelay": 0 } diff --git a/network-configs/hubblenet/chain_api_node.json b/network-configs/hubblenet/chain_api_node.json index 18ec743005..01793057e3 100644 --- a/network-configs/hubblenet/chain_api_node.json +++ b/network-configs/hubblenet/chain_api_node.json @@ -5,14 +5,31 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F"], + "priority-regossip-addresses": [ + "0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F" + ], "continuous-profiler-dir": "/var/avalanche/profiles/hubblenet/continuous/", "continuous-profiler-max-files": 200, "continuous-profiler-frequency": "10m", "admin-api-enabled": true, - "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], + "eth-apis": [ + "public-eth", + "public-eth-filter", + "net", + "web3", + "internal-public-eth", + "internal-blockchain", + "internal-transaction", + "internal-debug", + "internal-tx-pool", + "internal-account", + "debug-tracer" + ], "trading-api-enabled": true, "testing-api-enabled": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/hubblenet/chain_archival_node.json b/network-configs/hubblenet/chain_archival_node.json index 2471a78dc8..2a0f853b85 100644 --- a/network-configs/hubblenet/chain_archival_node.json +++ b/network-configs/hubblenet/chain_archival_node.json @@ -6,14 +6,31 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F"], + "priority-regossip-addresses": [ + "0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F" + ], "continuous-profiler-dir": "/var/avalanche/profiles/hubblenet/continuous/", "continuous-profiler-max-files": 200, "continuous-profiler-frequency": "10m", "admin-api-enabled": true, - "eth-apis": ["public-eth","public-eth-filter","net","web3","internal-public-eth","internal-blockchain","internal-transaction","internal-debug","internal-tx-pool","internal-account","debug-tracer"], + "eth-apis": [ + "public-eth", + "public-eth-filter", + "net", + "web3", + "internal-public-eth", + "internal-blockchain", + "internal-transaction", + "internal-debug", + "internal-tx-pool", + "internal-account", + "debug-tracer" + ], "trading-api-enabled": true, "testing-api-enabled": true, "snapshot-file-path": "/tmp/snapshot", - "makerbook-database-path": "/tmp/makerbook" + "makerbook-database-path": "/tmp/makerbook", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/network-configs/hubblenet/chain_validator_1.json b/network-configs/hubblenet/chain_validator_1.json index b5694f7d3a..8c159486b0 100644 --- a/network-configs/hubblenet/chain_validator_1.json +++ b/network-configs/hubblenet/chain_validator_1.json @@ -5,12 +5,17 @@ "tx-regossip-max-size": 32, "priority-regossip-max-txs": 32, "priority-regossip-txs-per-address": 20, - "priority-regossip-addresses": ["0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F"], + "priority-regossip-addresses": [ + "0x8747adFCE380492ec7e9b78761Ec7C87F5Cd3d4F" + ], "continuous-profiler-dir": "/var/avalanche/profiles/hubblenet/continuous/", "continuous-profiler-max-files": 200, "continuous-profiler-frequency": "10m", "validator-private-key-file": "/var/avalanche/validator.pk", "feeRecipient": "0xa5e31FbE901362Cc93b6fdab99DB9741c673a942", "is-validator": true, - "snapshot-file-path": "/tmp/snapshot" + "snapshot-file-path": "/tmp/snapshot", + "order-gossip-num-validators": 10, + "order-gossip-num-non-validators": 5, + "order-gossip-num-peers": 15 } diff --git a/plugin/evm/config.go b/plugin/evm/config.go index 4443680d08..60e35e5933 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -50,6 +50,9 @@ const ( defaultPopulateMissingTriesParallelism = 1024 defaultStateSyncServerTrieCache = 64 // MB defaultAcceptedCacheSize = 32 // blocks + defaulOrderGossipNumValidators = 10 + defaultOrderGossipNumNonValidators = 5 + defaultOrderGossipNumPeers = 15 // defaultStateSyncMinBlocks is the minimum number of blocks the blockchain // should be ahead of local last accepted to perform state sync. @@ -173,6 +176,11 @@ type Config struct { RegossipFrequency Duration `json:"regossip-frequency"` PriorityRegossipAddresses []common.Address `json:"priority-regossip-addresses"` + // Order Gossip Settings + OrderGossipNumValidators int `json:"order-gossip-num-validators"` + OrderGossipNumNonValidators int `json:"order-gossip-num-non-validators"` + OrderGossipNumPeers int `json:"order-gossip-num-peers"` + // Log LogLevel string `json:"log-level"` LogJSONFormat bool `json:"log-json-format"` @@ -316,6 +324,9 @@ func (c *Config) SetDefaults() { c.LoadFromSnapshotEnabled = defaultLoadFromSnapshotEnabled c.SnapshotFilePath = defaultSnapshotFilePath c.MakerbookDatabasePath = defaultMakerbookDatabasePath + c.OrderGossipNumValidators = defaulOrderGossipNumValidators + c.OrderGossipNumNonValidators = defaultOrderGossipNumNonValidators + c.OrderGossipNumPeers = defaultOrderGossipNumPeers } func (d *Duration) UnmarshalJSON(data []byte) (err error) { diff --git a/plugin/evm/gossip_stats.go b/plugin/evm/gossip_stats.go index 5116bc3e5e..5840250015 100644 --- a/plugin/evm/gossip_stats.go +++ b/plugin/evm/gossip_stats.go @@ -10,29 +10,15 @@ var _ GossipStats = &gossipStats{} // GossipStats contains methods for updating incoming and outgoing gossip stats. type GossipStats interface { IncEthTxsGossipReceived() - - // new vs. known txs received IncEthTxsGossipReceivedError() IncEthTxsGossipReceivedKnown() IncEthTxsGossipReceivedNew() IncSignedOrdersGossipReceived(count int64) IncSignedOrdersGossipBatchReceived() - - // new vs. known txs received IncSignedOrdersGossipReceivedKnown() IncSignedOrdersGossipReceivedNew() IncSignedOrdersGossipReceiveError() -} - -// GossipSentStats groups functions for outgoing gossip stats. -type GossipSentStats interface { - IncEthTxsGossipSent() - - // regossip - IncEthTxsRegossipQueued() - IncEthTxsRegossipQueuedLocal(count int) - IncEthTxsRegossipQueuedRemote(count int) IncSignedOrdersGossipSent(count int64) IncSignedOrdersGossipBatchSent() @@ -42,27 +28,21 @@ type GossipSentStats interface { // gossipStats implements stats for incoming and outgoing gossip stats. type gossipStats struct { - // messages - ethTxsGossipReceived metrics.Counter - - // new vs. known txs received + ethTxsGossipReceived metrics.Counter ethTxsGossipReceivedError metrics.Counter ethTxsGossipReceivedKnown metrics.Counter ethTxsGossipReceivedNew metrics.Counter - // messages - signedOrdersGossipSent metrics.Counter - signedOrdersGossipBatchSent metrics.Counter - signedOrdersGossipSendError metrics.Counter - signedOrdersGossipOrderExpired metrics.Counter signedOrdersGossipReceived metrics.Counter signedOrdersGossipBatchReceived metrics.Counter - - // regossip - // new vs. known txs received signedOrdersGossipReceivedKnown metrics.Counter signedOrdersGossipReceivedNew metrics.Counter signedOrdersGossipReceiveError metrics.Counter + + signedOrdersGossipSent metrics.Counter + signedOrdersGossipBatchSent metrics.Counter + signedOrdersGossipSendError metrics.Counter + signedOrdersGossipOrderExpired metrics.Counter } func NewGossipStats() GossipStats { diff --git a/plugin/evm/gossiper.go b/plugin/evm/gossiper.go deleted file mode 100644 index 7d0a7116fb..0000000000 --- a/plugin/evm/gossiper.go +++ /dev/null @@ -1,512 +0,0 @@ -// (c) 2019-2021, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package evm - -import ( - "context" - "math/big" - "sync" - "time" - - "github.com/ava-labs/avalanchego/codec" - "github.com/ava-labs/avalanchego/network/p2p/gossip" - - "github.com/ava-labs/subnet-evm/peer" - - "github.com/ava-labs/avalanchego/cache" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" - - "github.com/ava-labs/subnet-evm/core" - "github.com/ava-labs/subnet-evm/core/state" - "github.com/ava-labs/subnet-evm/core/txpool" - "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/subnet-evm/plugin/evm/message" - "github.com/ava-labs/subnet-evm/plugin/evm/orderbook/hubbleutils" -) - -const ( - // We allow [recentCacheSize] to be fairly large because we only store hashes - // in the cache, not entire transactions. - recentCacheSize = 512 - - // [ethTxsGossipInterval] is how often we attempt to gossip newly seen - // transactions to other nodes. - ethTxsGossipInterval = 500 * time.Millisecond - - // [ordersGossipInterval] is how often we attempt to gossip newly seen - // signed orders to other nodes. - ordersGossipInterval = 100 * time.Millisecond - - // [minGossipBatchInterval] is the minimum amount of time that must pass - // before our last gossip to peers. - minGossipBatchInterval = 50 * time.Millisecond - - // [minGossipOrdersBatchInterval] is the minimum amount of time that must pass - // before our last gossip to peers. - minGossipOrdersBatchInterval = 50 * time.Millisecond - - // [maxSignedOrdersGossipBatchSize] is the maximum number of orders we will - // attempt to gossip at once. - maxSignedOrdersGossipBatchSize = 100 -) - -// Gossiper handles outgoing gossip of transactions -type Gossiper interface { - // GossipEthTxs sends AppGossip message containing the given [txs] - GossipEthTxs(txs []*types.Transaction) error - - // GossipSignedOrders sends signed orders to the network - GossipSignedOrders(orders []*hubbleutils.SignedOrder) error -} - -// pushGossiper is used to gossip transactions to the network -type pushGossiper struct { - ctx *snow.Context - config Config - - client peer.NetworkClient - blockchain *core.BlockChain - txPool *txpool.TxPool - ethTxGossiper gossip.Accumulator[*GossipEthTx] - - // We attempt to batch transactions we need to gossip to avoid runaway - // amplification of mempol chatter. - ethTxsToGossipChan chan []*types.Transaction - ethTxsToGossip map[common.Hash]*types.Transaction - lastGossiped time.Time - shutdownChan chan struct{} - shutdownWg *sync.WaitGroup - - ordersToGossipChan chan []*hubbleutils.SignedOrder - ordersToGossip []*hubbleutils.SignedOrder - lastOrdersGossiped time.Time - - // [recentEthTxs] prevent us from over-gossiping the - // same transaction in a short period of time. - recentEthTxs *cache.LRU[common.Hash, interface{}] - - codec codec.Manager - signer types.Signer - stats GossipSentStats -} - -// createGossiper constructs and returns a pushGossiper or noopGossiper -// based on whether vm.chainConfig.SubnetEVMTimestamp is set -func (vm *VM) createGossiper( - stats GossipStats, - ethTxGossiper gossip.Accumulator[*GossipEthTx], -) Gossiper { - net := &pushGossiper{ - ctx: vm.ctx, - config: vm.config, - client: vm.client, - blockchain: vm.blockChain, - txPool: vm.txPool, - ethTxsToGossipChan: make(chan []*types.Transaction), - ethTxsToGossip: make(map[common.Hash]*types.Transaction), - shutdownChan: vm.shutdownChan, - shutdownWg: &vm.shutdownWg, - recentEthTxs: &cache.LRU[common.Hash, interface{}]{Size: recentCacheSize}, - codec: vm.networkCodec, - signer: types.LatestSigner(vm.blockChain.Config()), - stats: stats, - ethTxGossiper: ethTxGossiper, - ordersToGossipChan: make(chan []*hubbleutils.SignedOrder), - ordersToGossip: []*hubbleutils.SignedOrder{}, - } - - net.awaitEthTxGossip() - net.awaitSignedOrderGossip() - return net -} - -// addrStatus used to track the metadata of addresses being queued for -// regossip. -type addrStatus struct { - nonce uint64 - txsAdded int -} - -// queueExecutableTxs attempts to select up to [maxTxs] from the tx pool for -// regossiping (with at most [maxAcctTxs] per account). -// -// We assume that [txs] contains an array of nonce-ordered transactions for a given -// account. This array of transactions can have gaps and start at a nonce lower -// than the current state of an account. -func (n *pushGossiper) queueExecutableTxs( - state *state.StateDB, - baseFee *big.Int, - txs map[common.Address]types.Transactions, - regossipFrequency Duration, - maxTxs int, - maxAcctTxs int, -) types.Transactions { - var ( - stxs = types.NewTransactionsByPriceAndNonce(n.signer, txs, baseFee) - statuses = make(map[common.Address]*addrStatus) - queued = make([]*types.Transaction, 0, maxTxs) - ) - - // Iterate over possible transactions until there are none left or we have - // hit the regossip target. - for len(queued) < maxTxs { - next := stxs.Peek() - if next == nil { - break - } - - sender, _ := types.Sender(n.signer, next) - status, ok := statuses[sender] - if !ok { - status = &addrStatus{ - nonce: state.GetNonce(sender), - } - statuses[sender] = status - } - - // The tx pool may be out of sync with current state, so we iterate - // through the account transactions until we get to one that is - // executable. - switch { - case next.Nonce() < status.nonce: - stxs.Shift() - continue - case next.Nonce() > status.nonce, time.Since(next.FirstSeen()) < regossipFrequency.Duration, - status.txsAdded >= maxAcctTxs: - stxs.Pop() - continue - } - queued = append(queued, next) - status.nonce++ - status.txsAdded++ - stxs.Shift() - } - - return queued -} - -// queueRegossipTxs finds the best non-priority transactions in the mempool and adds up to -// [RegossipMaxTxs] of them to [txsToGossip]. -func (n *pushGossiper) queueRegossipTxs() types.Transactions { - // Fetch all pending transactions - pending := n.txPool.Pending(true) - - // Split the pending transactions into locals and remotes - localTxs := make(map[common.Address]types.Transactions) - remoteTxs := pending - for _, account := range n.txPool.Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - - // Add best transactions to be gossiped (preferring local txs) - tip := n.blockchain.CurrentBlock() - state, err := n.blockchain.StateAt(tip.Root) - if err != nil || state == nil { - log.Debug( - "could not get state at tip", - "tip", tip.Hash(), - "err", err, - ) - return nil - } - rgFrequency := n.config.RegossipFrequency - rgMaxTxs := n.config.RegossipMaxTxs - rgTxsPerAddr := n.config.RegossipTxsPerAddress - localQueued := n.queueExecutableTxs(state, tip.BaseFee, localTxs, rgFrequency, rgMaxTxs, rgTxsPerAddr) - localCount := len(localQueued) - n.stats.IncEthTxsRegossipQueuedLocal(localCount) - if localCount >= rgMaxTxs { - n.stats.IncEthTxsRegossipQueued() - return localQueued - } - remoteQueued := n.queueExecutableTxs(state, tip.BaseFee, remoteTxs, rgFrequency, rgMaxTxs-localCount, rgTxsPerAddr) - n.stats.IncEthTxsRegossipQueuedRemote(len(remoteQueued)) - if localCount+len(remoteQueued) > 0 { - // only increment the regossip stat when there are any txs queued - n.stats.IncEthTxsRegossipQueued() - } - return append(localQueued, remoteQueued...) -} - -// queuePriorityRegossipTxs finds the best priority transactions in the mempool and adds up to -// [PriorityRegossipMaxTxs] of them to [txsToGossip]. -func (n *pushGossiper) queuePriorityRegossipTxs() types.Transactions { - // Fetch all pending transactions from the priority addresses - priorityTxs := n.txPool.PendingFrom(n.config.PriorityRegossipAddresses, true) - - // Add best transactions to be gossiped - tip := n.blockchain.CurrentBlock() - state, err := n.blockchain.StateAt(tip.Root) - if err != nil || state == nil { - log.Debug( - "could not get state at tip", - "tip", tip.Hash(), - "err", err, - ) - return nil - } - return n.queueExecutableTxs( - state, tip.BaseFee, priorityTxs, - n.config.PriorityRegossipFrequency, - n.config.PriorityRegossipMaxTxs, - n.config.PriorityRegossipTxsPerAddress, - ) -} - -// awaitEthTxGossip periodically gossips transactions that have been queued for -// gossip at least once every [ethTxsGossipInterval]. -func (n *pushGossiper) awaitEthTxGossip() { - n.shutdownWg.Add(1) - go n.ctx.Log.RecoverAndPanic(func() { - var ( - gossipTicker = time.NewTicker(ethTxsGossipInterval) - regossipTicker = time.NewTicker(n.config.RegossipFrequency.Duration) - priorityRegossipTicker = time.NewTicker(n.config.PriorityRegossipFrequency.Duration) - ) - defer func() { - gossipTicker.Stop() - regossipTicker.Stop() - priorityRegossipTicker.Stop() - n.shutdownWg.Done() - }() - - for { - select { - case <-gossipTicker.C: - if attempted, err := n.gossipEthTxs(false); err != nil { - log.Warn( - "failed to send eth transactions", - "len(txs)", attempted, - "err", err, - ) - } - if err := n.ethTxGossiper.Gossip(context.TODO()); err != nil { - log.Warn( - "failed to send eth transactions", - "err", err, - ) - } - case <-regossipTicker.C: - for _, tx := range n.queueRegossipTxs() { - n.ethTxsToGossip[tx.Hash()] = tx - } - if attempted, err := n.gossipEthTxs(true); err != nil { - log.Warn( - "failed to regossip eth transactions", - "len(txs)", attempted, - "err", err, - ) - } - case <-priorityRegossipTicker.C: - for _, tx := range n.queuePriorityRegossipTxs() { - n.ethTxsToGossip[tx.Hash()] = tx - } - if attempted, err := n.gossipEthTxs(true); err != nil { - log.Warn( - "failed to regossip priority eth transactions", - "len(txs)", attempted, - "err", err, - ) - } - case txs := <-n.ethTxsToGossipChan: - for _, tx := range txs { - n.ethTxsToGossip[tx.Hash()] = tx - } - if attempted, err := n.gossipEthTxs(false); err != nil { - log.Warn( - "failed to send eth transactions", - "len(txs)", attempted, - "err", err, - ) - } - - gossipTxs := make([]*GossipEthTx, 0, len(txs)) - for _, tx := range txs { - gossipTxs = append(gossipTxs, &GossipEthTx{Tx: tx}) - } - - n.ethTxGossiper.Add(gossipTxs...) - if err := n.ethTxGossiper.Gossip(context.TODO()); err != nil { - log.Warn( - "failed to send eth transactions", - "len(txs)", len(txs), - "err", err, - ) - } - - case <-n.shutdownChan: - return - } - } - }) -} - -func (n *pushGossiper) sendEthTxs(txs []*types.Transaction) error { - if len(txs) == 0 { - return nil - } - - txBytes, err := rlp.EncodeToBytes(txs) - if err != nil { - return err - } - msg := message.EthTxsGossip{ - Txs: txBytes, - } - msgBytes, err := message.BuildGossipMessage(n.codec, msg) - if err != nil { - return err - } - - log.Trace( - "gossiping eth txs", - "len(txs)", len(txs), - "size(txs)", len(msg.Txs), - ) - n.stats.IncEthTxsGossipSent() - return n.client.Gossip(msgBytes) -} - -func (n *pushGossiper) gossipEthTxs(force bool) (int, error) { - if (!force && time.Since(n.lastGossiped) < minGossipBatchInterval) || len(n.ethTxsToGossip) == 0 { - return 0, nil - } - n.lastGossiped = time.Now() - txs := make([]*types.Transaction, 0, len(n.ethTxsToGossip)) - for _, tx := range n.ethTxsToGossip { - txs = append(txs, tx) - delete(n.ethTxsToGossip, tx.Hash()) - } - - selectedTxs := make([]*types.Transaction, 0) - for _, tx := range txs { - txHash := tx.Hash() - txStatus := n.txPool.Status([]common.Hash{txHash})[0] - if txStatus != txpool.TxStatusPending { - continue - } - - if n.config.RemoteGossipOnlyEnabled && n.txPool.HasLocal(txHash) { - continue - } - - // We check [force] outside of the if statement to avoid an unnecessary - // cache lookup. - if !force { - if _, has := n.recentEthTxs.Get(txHash); has { - continue - } - } - n.recentEthTxs.Put(txHash, nil) - - selectedTxs = append(selectedTxs, tx) - } - - if len(selectedTxs) == 0 { - return 0, nil - } - - // Attempt to gossip [selectedTxs] - msgTxs := make([]*types.Transaction, 0) - msgTxsSize := uint64(0) - for _, tx := range selectedTxs { - size := tx.Size() - if msgTxsSize+size > message.EthMsgSoftCapSize { - if err := n.sendEthTxs(msgTxs); err != nil { - return len(selectedTxs), err - } - msgTxs = msgTxs[:0] - msgTxsSize = 0 - } - msgTxs = append(msgTxs, tx) - msgTxsSize += size - } - - // Send any remaining [msgTxs] - return len(selectedTxs), n.sendEthTxs(msgTxs) -} - -// GossipEthTxs enqueues the provided [txs] for gossiping. At some point, the -// [pushGossiper] will attempt to gossip the provided txs to other nodes -// (usually right away if not under load). -// -// NOTE: We never return a non-nil error from this function but retain the -// option to do so in case it becomes useful. -func (n *pushGossiper) GossipEthTxs(txs []*types.Transaction) error { - select { - case n.ethTxsToGossipChan <- txs: - case <-n.shutdownChan: - } - return nil -} - -// GossipHandler handles incoming gossip messages -type GossipHandler struct { - mu sync.RWMutex - vm *VM - txPool *txpool.TxPool - stats GossipReceivedStats -} - -func NewGossipHandler(vm *VM, stats GossipReceivedStats) *GossipHandler { - return &GossipHandler{ - vm: vm, - txPool: vm.txPool, - stats: stats, - } -} - -func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip) error { - log.Trace( - "AppGossip called with EthTxsGossip", - "peerID", nodeID, - "size(txs)", len(msg.Txs), - ) - - if len(msg.Txs) == 0 { - log.Trace( - "AppGossip received empty EthTxsGossip Message", - "peerID", nodeID, - ) - return nil - } - - // The maximum size of this encoded object is enforced by the codec. - txs := make([]*types.Transaction, 0) - if err := rlp.DecodeBytes(msg.Txs, &txs); err != nil { - log.Trace( - "AppGossip provided invalid txs", - "peerID", nodeID, - "err", err, - ) - return nil - } - h.stats.IncEthTxsGossipReceived() - errs := h.txPool.AddRemotes(txs) - for i, err := range errs { - if err != nil { - log.Trace( - "AppGossip failed to add to mempool", - "err", err, - "tx", txs[i].Hash(), - ) - if err == txpool.ErrAlreadyKnown { - h.stats.IncEthTxsGossipReceivedKnown() - } else { - h.stats.IncEthTxsGossipReceivedError() - } - continue - } - h.stats.IncEthTxsGossipReceivedNew() - } - return nil -} diff --git a/plugin/evm/gossiper_orders.go b/plugin/evm/gossiper_orders.go index b4df9971c1..7f8d2cd3d1 100644 --- a/plugin/evm/gossiper_orders.go +++ b/plugin/evm/gossiper_orders.go @@ -2,17 +2,78 @@ package evm import ( "bytes" + "context" "encoding/gob" + "sync" "time" - "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/codec" + "github.com/ava-labs/avalanchego/snow" + commonEng "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/subnet-evm/plugin/evm/message" "github.com/ava-labs/subnet-evm/plugin/evm/orderbook" + "github.com/ava-labs/subnet-evm/plugin/evm/orderbook/hubbleutils" hu "github.com/ava-labs/subnet-evm/plugin/evm/orderbook/hubbleutils" "github.com/ethereum/go-ethereum/log" ) -func (n *pushGossiper) GossipSignedOrders(orders []*hu.SignedOrder) error { +const ( + // [ordersGossipInterval] is how often we attempt to gossip newly seen + // signed orders to other nodes. + ordersGossipInterval = 100 * time.Millisecond + + // [minGossipOrdersBatchInterval] is the minimum amount of time that must pass + // before our last gossip to peers. + minGossipOrdersBatchInterval = 50 * time.Millisecond + + // [maxSignedOrdersGossipBatchSize] is the maximum number of orders we will + // attempt to gossip at once. + maxSignedOrdersGossipBatchSize = 100 +) + +type OrderGossiper interface { + // GossipSignedOrders sends signed orders to the network + GossipSignedOrders(orders []*hubbleutils.SignedOrder) error +} + +type orderPushGossiper struct { + ctx *snow.Context + config Config + + shutdownChan chan struct{} + shutdownWg *sync.WaitGroup + + ordersToGossipChan chan []*hubbleutils.SignedOrder + ordersToGossip []*hubbleutils.SignedOrder + lastOrdersGossiped time.Time + + codec codec.Manager + stats GossipStats + + appSender commonEng.AppSender +} + +// createOrderGossiper constructs and returns a orderPushGossiper or noopGossiper +func (vm *VM) createOrderGossiper( + stats GossipStats, +) OrderGossiper { + net := &orderPushGossiper{ + ctx: vm.ctx, + config: vm.config, + shutdownChan: vm.shutdownChan, + shutdownWg: &vm.shutdownWg, + codec: vm.networkCodec, + stats: stats, + ordersToGossipChan: make(chan []*hubbleutils.SignedOrder), + ordersToGossip: []*hubbleutils.SignedOrder{}, + appSender: vm.p2pSender, + } + + net.awaitSignedOrderGossip() + return net +} + +func (n *orderPushGossiper) GossipSignedOrders(orders []*hu.SignedOrder) error { select { case n.ordersToGossipChan <- orders: case <-n.shutdownChan: @@ -20,7 +81,7 @@ func (n *pushGossiper) GossipSignedOrders(orders []*hu.SignedOrder) error { return nil } -func (n *pushGossiper) awaitSignedOrderGossip() { +func (n *orderPushGossiper) awaitSignedOrderGossip() { n.shutdownWg.Add(1) go executeFuncAndRecoverPanic(func() { var ( @@ -59,7 +120,7 @@ func (n *pushGossiper) awaitSignedOrderGossip() { }, "panic in awaitSignedOrderGossip", orderbook.AwaitSignedOrdersGossipPanicsCounter) } -func (n *pushGossiper) gossipSignedOrders() (int, error) { +func (n *orderPushGossiper) gossipSignedOrders() (int, error) { if (time.Since(n.lastOrdersGossiped) < minGossipOrdersBatchInterval) || len(n.ordersToGossip) == 0 { return 0, nil } @@ -93,7 +154,7 @@ func (n *pushGossiper) gossipSignedOrders() (int, error) { return len(selectedOrders), err } -func (n *pushGossiper) sendSignedOrders(orders []*hu.SignedOrder) error { +func (n *orderPushGossiper) sendSignedOrders(orders []*hu.SignedOrder) error { if len(orders) == 0 { return nil } @@ -117,66 +178,16 @@ func (n *pushGossiper) sendSignedOrders(orders []*hu.SignedOrder) error { "len(orders)", len(orders), "size(orders)", len(msg.Orders), ) - n.stats.IncSignedOrdersGossipSent(int64(len(orders))) - n.stats.IncSignedOrdersGossipBatchSent() - return n.client.Gossip(msgBytes) -} -// #### HANDLER #### - -func (h *GossipHandler) HandleSignedOrders(nodeID ids.NodeID, msg message.SignedOrdersGossip) error { - h.mu.Lock() - defer h.mu.Unlock() - - log.Trace( - "AppGossip called with SignedOrdersGossip", - "peerID", nodeID, - "bytes(orders)", len(msg.Orders), - ) - - if len(msg.Orders) == 0 { - log.Warn( - "AppGossip received empty SignedOrdersGossip Message", - "peerID", nodeID, - ) - return nil - } - - orders := make([]*hu.SignedOrder, 0) - buf := bytes.NewBuffer(msg.Orders) - err := gob.NewDecoder(buf).Decode(&orders) + validators := n.config.OrderGossipNumValidators + nonValidators := n.config.OrderGossipNumNonValidators + peers := n.config.OrderGossipNumPeers + err = n.appSender.SendAppGossip(context.TODO(), msgBytes, validators, nonValidators, peers) if err != nil { - log.Error("failed to decode signed orders", "err", err) + log.Error("failed to gossip orders") return err } - - h.stats.IncSignedOrdersGossipReceived(int64(len(orders))) - h.stats.IncSignedOrdersGossipBatchReceived() - - tradingAPI := h.vm.limitOrderProcesser.GetTradingAPI() - - // re-gossip orders, but not when we already knew the orders - ordersToGossip := make([]*hu.SignedOrder, 0) - for _, order := range orders { - _, shouldTriggerMatching, err := tradingAPI.PlaceOrder(order) - if err == nil { - h.stats.IncSignedOrdersGossipReceivedNew() - ordersToGossip = append(ordersToGossip, order) - if shouldTriggerMatching { - log.Info("received new match-able signed order, triggering matching pipeline...") - h.vm.limitOrderProcesser.RunMatchingPipeline() - } - } else if err == hu.ErrOrderAlreadyExists { - h.stats.IncSignedOrdersGossipReceivedKnown() - } else { - h.stats.IncSignedOrdersGossipReceiveError() - log.Error("failed to place order", "err", err) - } - } - - if len(ordersToGossip) > 0 { - h.vm.gossiper.GossipSignedOrders(ordersToGossip) - } - + n.stats.IncSignedOrdersGossipSent(int64(len(orders))) + n.stats.IncSignedOrdersGossipBatchSent() return nil } diff --git a/plugin/evm/handler.go b/plugin/evm/handler.go index 2915d422a2..c69b149523 100644 --- a/plugin/evm/handler.go +++ b/plugin/evm/handler.go @@ -4,6 +4,10 @@ package evm import ( + "bytes" + "encoding/gob" + "sync" + "github.com/ava-labs/avalanchego/ids" "github.com/ethereum/go-ethereum/log" @@ -12,10 +16,12 @@ import ( "github.com/ava-labs/subnet-evm/core/txpool" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/plugin/evm/message" + hu "github.com/ava-labs/subnet-evm/plugin/evm/orderbook/hubbleutils" ) // GossipHandler handles incoming gossip messages type GossipHandler struct { + mu sync.RWMutex vm *VM txPool *txpool.TxPool stats GossipStats @@ -74,3 +80,60 @@ func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip } return nil } + +func (h *GossipHandler) HandleSignedOrders(nodeID ids.NodeID, msg message.SignedOrdersGossip) error { + h.mu.Lock() + defer h.mu.Unlock() + + log.Trace( + "AppGossip called with SignedOrdersGossip", + "peerID", nodeID, + "bytes(orders)", len(msg.Orders), + ) + + if len(msg.Orders) == 0 { + log.Warn( + "AppGossip received empty SignedOrdersGossip Message", + "peerID", nodeID, + ) + return nil + } + + orders := make([]*hu.SignedOrder, 0) + buf := bytes.NewBuffer(msg.Orders) + err := gob.NewDecoder(buf).Decode(&orders) + if err != nil { + log.Error("failed to decode signed orders", "err", err) + return err + } + + h.stats.IncSignedOrdersGossipReceived(int64(len(orders))) + h.stats.IncSignedOrdersGossipBatchReceived() + + tradingAPI := h.vm.limitOrderProcesser.GetTradingAPI() + + // re-gossip orders, but not when we already knew the orders + ordersToGossip := make([]*hu.SignedOrder, 0) + for _, order := range orders { + _, shouldTriggerMatching, err := tradingAPI.PlaceOrder(order) + if err == nil { + h.stats.IncSignedOrdersGossipReceivedNew() + ordersToGossip = append(ordersToGossip, order) + if shouldTriggerMatching { + log.Info("received new match-able signed order, triggering matching pipeline...") + h.vm.limitOrderProcesser.RunMatchingPipeline() + } + } else if err == hu.ErrOrderAlreadyExists { + h.stats.IncSignedOrdersGossipReceivedKnown() + } else { + h.stats.IncSignedOrdersGossipReceiveError() + log.Error("failed to place order", "err", err) + } + } + + if len(ordersToGossip) > 0 { + h.vm.orderGossiper.GossipSignedOrders(ordersToGossip) + } + + return nil +} diff --git a/plugin/evm/order_api.go b/plugin/evm/order_api.go index 839d3bf442..6b16027290 100644 --- a/plugin/evm/order_api.go +++ b/plugin/evm/order_api.go @@ -72,7 +72,7 @@ func (api *OrderAPI) PlaceSignedOrders(ctx context.Context, input string) (Place ordersToGossip = append(ordersToGossip, order) } - api.vm.gossiper.GossipSignedOrders(ordersToGossip) + api.vm.orderGossiper.GossipSignedOrders(ordersToGossip) return PlaceSignedOrdersResponse{Orders: response}, nil } diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 951f8feb34..30298ca05f 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -229,7 +229,7 @@ type VM struct { limitOrderProcesser LimitOrderProcesser - // gossiper Gossiper + orderGossiper OrderGossiper clock mockable.Clock @@ -737,6 +737,7 @@ func (vm *VM) initBlockBuilding() error { // NOTE: gossip network must be initialized first otherwise ETH tx gossip will not work. gossipStats := NewGossipStats() + vm.orderGossiper = vm.createOrderGossiper(gossipStats) vm.builder = vm.NewBlockBuilder(vm.toEngine) vm.builder.awaitSubmittedTxs() vm.Network.SetGossipHandler(NewGossipHandler(vm, gossipStats)) diff --git a/plugin/evm/vm_test.go b/plugin/evm/vm_test.go index a45ffc01f7..adc10a7cd8 100644 --- a/plugin/evm/vm_test.go +++ b/plugin/evm/vm_test.go @@ -3275,6 +3275,12 @@ func TestCrossChainMessagestoVM(t *testing.T) { require.True(calledSendCrossChainAppResponseFn, "sendCrossChainAppResponseFn was not called") } +func TestVMOrderGossiperIsSet(t *testing.T) { + _, vm, _, _ := GenesisVM(t, true, "", "", "") + require.NotNil(t, vm.orderGossiper, "legacy gossiper should be initialized") + require.NoError(t, vm.Shutdown(context.Background())) +} + func createValidatorPrivateKeyIfNotExists() { // Create a new validator private key file defaultValidatorPrivateKeyFile = "/tmp/validator.pk" diff --git a/scripts/run_local.sh b/scripts/run_local.sh index d4eb53cbf3..f99f4515dc 100755 --- a/scripts/run_local.sh +++ b/scripts/run_local.sh @@ -28,6 +28,6 @@ avalanche subnet configure localnet --subnet-config subnet.json --config .avalan # use the same avalanchego version as the one used in subnet-evm # use tee to keep showing outut while storing in a var -OUTPUT=$(avalanche subnet deploy localnet -l --avalanchego-version v1.11.1 --config .avalanche-cli.json | tee /dev/fd/2) +OUTPUT=$(avalanche subnet deploy localnet -l --avalanchego-version v1.11.2 --config .avalanche-cli.json | tee /dev/fd/2) setStatus diff --git a/scripts/upgrade_local.sh b/scripts/upgrade_local.sh index e88908a8e1..bfa56aeb7f 100755 --- a/scripts/upgrade_local.sh +++ b/scripts/upgrade_local.sh @@ -9,6 +9,6 @@ avalanche network stop --snapshot-name snap1 avalanche subnet upgrade vm localnet --binary custom_evm.bin --local # utse tee to keep showing outut while storing in a var -OUTPUT=$(avalanche network start --avalanchego-version v1.11.1 --snapshot-name snap1 --config .avalanche-cli.json | tee /dev/fd/2) +OUTPUT=$(avalanche network start --avalanchego-version v1.11.2 --snapshot-name snap1 --config .avalanche-cli.json | tee /dev/fd/2) setStatus diff --git a/subnet.json b/subnet.json index 4aefdfbbf8..c1f28a5a9b 100644 --- a/subnet.json +++ b/subnet.json @@ -1,6 +1,3 @@ { - "proposerMinBlockDelay": 200000000, - "appGossipValidatorSize": 10, - "appGossipNonValidatorSize": 5, - "appGossipPeerSize": 15 -} \ No newline at end of file + "proposerMinBlockDelay": 200000000 +}