Skip to content

Commit

Permalink
[ETCM-280] use peer capabilities (protocol version) when sending chec…
Browse files Browse the repository at this point in the history
…kpoint-related messages
  • Loading branch information
rtkaczyk authored and pslaski committed Nov 26, 2020
1 parent 0bcd288 commit 298934b
Show file tree
Hide file tree
Showing 68 changed files with 1,409 additions and 970 deletions.
11 changes: 6 additions & 5 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestProbe
import akka.util.{ByteString, Timeout}
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor}
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, TestSyncConfig}
Expand All @@ -18,11 +19,10 @@ import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration}
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.DiscoveredNodesInfo
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{
Expand Down Expand Up @@ -159,7 +159,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
override val peerConfiguration: PeerConfiguration = peerConf
override val blockchain: Blockchain = bl
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
override val blockchainConfig = CommonFakePeer.this.blockchainConfig // FIXME: remove in ETCM-280
override val protocolVersion: Int = Config.Network.protocolVersion
}

lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
Expand All @@ -175,7 +175,8 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
handshaker,
authHandshaker,
EthereumMessageDecoder,
discoveryConfig
discoveryConfig,
Config.Network.protocolVersion
),
"peer-manager"
)
Expand Down Expand Up @@ -227,7 +228,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
}

private def broadcastBlock(block: Block, weight: ChainWeight) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
broadcasterActor ! BroadcastBlock(BlockToBroadcast(block, weight))
}

def getCurrentState(): BlockchainState = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import akka.actor.ActorRef
import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.consensus.Protocol.NoAdditionalEthashData
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.consensus.ethash.{EthashConfig, EthashConsensus}
import io.iohk.ethereum.consensus.{ConsensusConfig, FullConsensusConfig, ethash}
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.nodebuilder.VmSetup
import io.iohk.ethereum.ommers.OmmersPool
import io.iohk.ethereum.sync.util.SyncCommonItSpecUtils.FakePeerCustomConfig.defaultConfig
Expand Down Expand Up @@ -74,7 +74,6 @@ object RegularSyncItSpecUtils {
peerEventBus,
ledger,
bl,
blockchainConfig, // FIXME: remove in ETCM-280
validators.blockValidator,
testSyncConfig,
ommersPool,
Expand Down Expand Up @@ -145,7 +144,7 @@ object RegularSyncItSpecUtils {
}

private def broadcastBlock(block: Block, weight: ChainWeight) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
broadcasterActor ! BroadcastBlock(BlockToBroadcast(block, weight))
}

private def createChildBlock(
Expand Down
21 changes: 11 additions & 10 deletions src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ import java.net.URI
import akka.actor.{Actor, ActorRef, _}
import akka.util.ByteString
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain.{BlockBody, BlockHeader, Receipt}
import io.iohk.ethereum.domain.BlockHeaderImplicits._
import io.iohk.ethereum.network.{Peer, PeerManagerActor}
import io.iohk.ethereum.domain.{BlockBody, BlockHeader, Receipt}
import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MptNode}
import io.iohk.ethereum.network.PeerActor.SendMessage
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe}
import io.iohk.ethereum.network.PeerManagerActor.{GetPeers, Peers}
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV62._
import io.iohk.ethereum.network.p2p.messages.PV63._
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
import io.iohk.ethereum.network.p2p.messages.PV63.ReceiptImplicits._
import io.iohk.ethereum.network.p2p.messages.PV63._
import io.iohk.ethereum.network.{Peer, PeerManagerActor}
import io.iohk.ethereum.txExecTest.util.DumpChainActor._
import org.bouncycastle.util.encoders.Hex
import ReceiptImplicits._
import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MptNode}
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe}
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier

import scala.collection.immutable.HashMap
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import DumpChainActor._

/**
* Actor used for obtaining all the blockchain data (blocks, receipts, nodes) from the blocks [startBlock, maxBlocks]
Expand Down Expand Up @@ -84,7 +85,7 @@ class DumpChainActor(
peers.headOption.foreach { peer =>
peerMessageBus ! Subscribe(
MessageClassifier(
Set(BlockHeaders.code, BlockBodies.code, Receipts.code, NodeData.code),
Set(Codes.BlockHeadersCode, Codes.BlockBodiesCode, Codes.ReceiptsCode, Codes.NodeDataCode),
PeerSelector.WithId(peer.id)
)
)
Expand Down
21 changes: 11 additions & 10 deletions src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
package io.iohk.ethereum.txExecTest.util

import java.util.concurrent.atomic.AtomicReference

import akka.actor.ActorSystem
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import io.iohk.ethereum.db.components.Storages.PruningModeComponent
import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages}
import io.iohk.ethereum.db.storage.{AppStateStorage, StateStorage}
import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash}
import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
import io.iohk.ethereum.db.storage.{AppStateStorage, StateStorage}
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty
import io.iohk.ethereum.domain.{Blockchain, UInt256, _}
import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage}
import io.iohk.ethereum.mpt.MptNode
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
import io.iohk.ethereum.network.discovery.DiscoveryConfig
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{ForkResolver, PeerEventBusActor, PeerManagerActor}
import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, SecureRandomBuilder}
import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatus}
import java.util.concurrent.atomic.AtomicReference

import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
import monix.reactive.Observable
import org.bouncycastle.util.encoders.Hex

import scala.concurrent.duration._
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty
import io.iohk.ethereum.network.discovery.DiscoveryConfig
import monix.reactive.Observable

object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder with AuthHandshakerBuilder {
val conf = ConfigFactory.load("txExecTest/chainDump.conf")
Expand Down Expand Up @@ -82,8 +82,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val nodeStatusHolder: AtomicReference[NodeStatus] = DumpChainApp.nodeStatusHolder
override val peerConfiguration: PeerConfiguration = peerConfig
override val blockchain: Blockchain = DumpChainApp.blockchain
override val blockchainConfig: BlockchainConfig = DumpChainApp.blockchainConfig
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
override val protocolVersion: Int = Config.Network.protocolVersion
}

lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
Expand All @@ -99,7 +99,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
handshaker = handshaker,
authHandshaker = authHandshaker,
messageDecoder = EthereumMessageDecoder,
discoveryConfig
discoveryConfig,
Config.Network.protocolVersion
),
"peer-manager"
)
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ mantis {

network {
# Ethereum protocol version
# Supported versions:
# 63, 64 (experimental version which enables usage of messages with checkpointing information. In the future after ETCM-355, ETCM-356, it will be 66 probably)
protocol-version = 63

server-address {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, BlockHeaders, Ge
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, GetReceipts, NodeData, Receipts}
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
import io.iohk.ethereum.network.EtcPeerManagerActor
import io.iohk.ethereum.network.p2p.messages.Codes

/**
* BlockchainHost actor is in charge of replying to the peer's requests for blockchain data, which includes both
Expand All @@ -25,7 +26,8 @@ class BlockchainHostActor(
) extends Actor
with ActorLogging {

private val requestMsgsCodes = Set(GetNodeData.code, GetReceipts.code, GetBlockBodies.code, GetBlockHeaders.code)
private val requestMsgsCodes =
Set(Codes.GetNodeDataCode, Codes.GetReceiptsCode, Codes.GetBlockBodiesCode, Codes.GetBlockHeadersCode)
peerEventBusActor ! Subscribe(MessageClassifier(requestMsgsCodes, PeerSelector.AllPeers))

override def receive: Receive = { case MessageFromPeer(message, peerId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.iohk.ethereum.blockchain.sync

import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.{Peer, PeerId}
import io.iohk.ethereum.network.p2p.messages.PV62._
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
Expand Down Expand Up @@ -94,9 +95,9 @@ class PeersClient(

private def responseMsgCode[RequestMsg <: Message](requestMsg: RequestMsg): Int =
requestMsg match {
case _: GetBlockHeaders => BlockHeaders.code
case _: GetBlockBodies => BlockBodies.code
case _: GetNodeData => NodeData.code
case _: GetBlockHeaders => Codes.BlockHeadersCode
case _: GetBlockBodies => Codes.BlockBodiesCode
case _: GetNodeData => Codes.NodeDataCode
}

private def printStatus(requesters: Requesters): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.BlockchainConfig
import io.iohk.ethereum.utils.Config.SyncConfig

class SyncController(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
fastSyncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand Down Expand Up @@ -102,7 +100,6 @@ class SyncController(
peerEventBus,
ledger,
blockchain,
blockchainConfig,
validators.blockValidator,
syncConfig,
ommersPool,
Expand All @@ -123,7 +120,6 @@ object SyncController {
def props(
appStateStorage: AppStateStorage,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
syncStateStorage: FastSyncStateStorage,
ledger: Ledger,
validators: Validators,
Expand All @@ -138,7 +134,6 @@ object SyncController {
new SyncController(
appStateStorage,
blockchain,
blockchainConfig,
syncStateStorage,
ledger,
validators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.iohk.ethereum.domain._
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV62._
import io.iohk.ethereum.network.p2p.messages.PV63._
import io.iohk.ethereum.utils.ByteStringUtils
Expand Down Expand Up @@ -710,7 +711,7 @@ class FastSync(
etcPeerManager,
peerEventBus,
requestMsg = GetReceipts(receiptsToGet),
responseMsgCode = Receipts.code
responseMsgCode = Codes.ReceiptsCode
)
)

Expand All @@ -731,7 +732,7 @@ class FastSync(
etcPeerManager,
peerEventBus,
requestMsg = GetBlockBodies(blockBodiesToGet),
responseMsgCode = BlockBodies.code
responseMsgCode = Codes.BlockBodiesCode
)
)

Expand All @@ -756,7 +757,7 @@ class FastSync(
etcPeerManager,
peerEventBus,
requestMsg = GetBlockHeaders(Left(syncState.bestBlockHeaderNumber + 1), limit, skip = 0, reverse = false),
responseMsgCode = BlockHeaders.code
responseMsgCode = Codes.BlockHeadersCode
),
BlockHeadersHandlerName
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
import io.iohk.ethereum.utils.Config.SyncConfig
Expand Down Expand Up @@ -79,7 +80,7 @@ class PivotBlockSelector(
): Receive =
handleCommonMessages orElse {
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
peerEventBus ! Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peerId)))
val updatedPeersToAsk = peersToAsk - peerId
val targetBlockHeaderOpt =
if (blockHeaders.headers.size != 1) None
Expand Down Expand Up @@ -167,7 +168,7 @@ class PivotBlockSelector(
}

private def obtainBlockHeaderFromPeer(peer: PeerId, blockNumber: BigInt): Unit = {
peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer)))
peerEventBus ! Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer)))
etcPeerManager ! EtcPeerManagerActor.SendMessage(
GetBlockHeaders(Left(blockNumber), 1, 0, reverse = false),
peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.iohk.ethereum.blockchain.sync.fast.SyncStateScheduler.{
import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor._
import io.iohk.ethereum.blockchain.sync.{BlacklistSupport, PeerListSupport, PeerRequestHandler}
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
Expand Down Expand Up @@ -55,7 +56,7 @@ class SyncStateSchedulerActor(
etcPeerManager,
peerEventBus,
requestMsg = GetNodeData(request.nodes.toList),
responseMsgCode = NodeData.code
responseMsgCode = Codes.NodeDataCode
)
)
context.watchWith(handler, RequestTerminated(request.peer))
Expand Down
Loading

0 comments on commit 298934b

Please sign in to comment.