Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-355] Send fork id to peers #1030

Merged
merged 13 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.DiscoveredNodesIn
import io.iohk.ethereum.network.handshaker.EtcHandshaker
import io.iohk.ethereum.network.handshaker.EtcHandshakerConfiguration
import io.iohk.ethereum.network.handshaker.Handshaker
import io.iohk.ethereum.network.p2p.messages.Capability
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.nodebuilder.PruningConfigBuilder
Expand Down Expand Up @@ -199,7 +198,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
override val blockchain: Blockchain = CommonFakePeer.this.bl
override val blockchainReader: BlockchainReader = CommonFakePeer.this.blockchainReader
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
override val capabilities: List[Capability] = blockchainConfig.capabilities
override val blockchainConfig: BlockchainConfig = Config.blockchains.blockchainConfig
}

lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ import io.iohk.ethereum.network.discovery.DiscoveryConfig
import io.iohk.ethereum.network.handshaker.EtcHandshaker
import io.iohk.ethereum.network.handshaker.EtcHandshakerConfiguration
import io.iohk.ethereum.network.handshaker.Handshaker
import io.iohk.ethereum.network.p2p.messages.Capability
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.nodebuilder.AuthHandshakerBuilder
import io.iohk.ethereum.nodebuilder.NodeKeyBuilder
import io.iohk.ethereum.security.SecureRandomBuilder
import io.iohk.ethereum.utils.BlockchainConfig
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.utils.NodeStatus
import io.iohk.ethereum.utils.ServerStatus
Expand Down Expand Up @@ -119,7 +119,7 @@ object DumpChainApp
override val blockchain: Blockchain = DumpChainApp.blockchain
override val blockchainReader: BlockchainReader = DumpChainApp.blockchainReader
override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage
override val capabilities: List[Capability] = blockchainConfig.capabilities
override val blockchainConfig: BlockchainConfig = Config.blockchains.blockchainConfig
}

lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1 - mainnet, 3 - ropsten, 7 - mordor
network-id = 42

capabilities = ["etc/64"]
capabilities = ["eth/64"]

# Possibility to set Proof of Work target time for testing purposes.
# null means that the standard difficulty calculation rules are used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import io.iohk.ethereum.network.p2p.messages.BaseETH6XMessages
import io.iohk.ethereum.network.p2p.messages.ETC64
import io.iohk.ethereum.network.p2p.messages.ETH62
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHash
import io.iohk.ethereum.network.p2p.messages.ProtocolVersions
import io.iohk.ethereum.network.p2p.messages.ProtocolFamily._

class BlockBroadcast(val etcPeerManager: ActorRef) {

Expand Down Expand Up @@ -45,10 +45,12 @@ class BlockBroadcast(val etcPeerManager: ActorRef) {

private def broadcastNewBlock(blockToBroadcast: BlockToBroadcast, peers: Map[PeerId, PeerWithInfo]): Unit =
obtainRandomPeerSubset(peers.values.map(_.peer).toSet).foreach { peer =>
val message: MessageSerializable =
if (peers(peer.id).peerInfo.remoteStatus.protocolVersion.toByte == ProtocolVersions.ETC64.version)
blockToBroadcast.as64
else blockToBroadcast.as63
val remoteStatus = peers(peer.id).peerInfo.remoteStatus

val message: MessageSerializable = remoteStatus.protocolFamily match {
case ETH => blockToBroadcast.as63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be so cool to have this fail if a new block type is introduced. just a thought, no great ideas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud:

  private def broadcastNewBlock(blockToBroadcast: BlockToBroadcast, peers: Map[PeerId, PeerWithInfo]): Unit =
    obtainRandomPeerSubset(peers.values.map(_.peer).toSet).foreach { peer =>
      val remoteStatus = peers(peer.id).peerInfo.remoteStatus

      val message: MessageSerializable = {
        val capOpt = Capabilities.of(remoteStatus.protocolFamily, remoteStatus.protocolVersion)
        capOpt match {
          case Capabilities.Eth63Capability => blockToBroadcast.as63
          case Capabilities.Eth64Capability => blockToBroadcast.as63
          case Capabilities.Etc64Capability => blockToBroadcast.as64
          case None                         => throw new RuntimeException("Unknown capability, cannot broadcast block")
        }
      }

...

    def of(pf: ProtocolFamily, v: Int): Option[Capability] =
      (pf, v) match {
        case (ProtocolFamily.ETH, 63) => Some(Eth63Capability)
        case (ProtocolFamily.ETH, 64) => Some(Eth64Capability)
        case (ProtocolFamily.ETC, 64) => Some(Etc64Capability)
        case _                        => None
      }

and an ADT out of the capabilities?

case ETC => blockToBroadcast.as64
}
etcPeerManager ! EtcPeerManagerActor.SendMessage(message, peer.id)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import io.iohk.ethereum.network.p2p.messages.ETC64.NewBlock
import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHeaders
import io.iohk.ethereum.network.p2p.messages.ETH62.GetBlockHeaders
import io.iohk.ethereum.network.p2p.messages.ETH62.NewBlockHashes
import io.iohk.ethereum.network.p2p.messages.ETH64
import io.iohk.ethereum.network.p2p.messages.ProtocolFamily
import io.iohk.ethereum.network.p2p.messages.ProtocolFamily.ETC
import io.iohk.ethereum.network.p2p.messages.ProtocolFamily.ETH
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect
import io.iohk.ethereum.utils.ByteStringUtils

Expand Down Expand Up @@ -239,6 +243,7 @@ object EtcPeerManagerActor {
* (they are different versions of Status msg)
*/
case class RemoteStatus(
protocolFamily: ProtocolFamily,
protocolVersion: Int,
networkId: Int,
chainWeight: ChainWeight,
Expand All @@ -247,6 +252,7 @@ object EtcPeerManagerActor {
) {
override def toString: String =
s"RemoteStatus { " +
s"protocolFamily: $protocolFamily, " +
s"protocolVersion: $protocolVersion, " +
s"networkId: $networkId, " +
s"chainWeight: $chainWeight, " +
Expand All @@ -256,11 +262,29 @@ object EtcPeerManagerActor {
}

object RemoteStatus {
def apply(status: ETH64.Status): RemoteStatus =
RemoteStatus(
ETH,
status.protocolVersion,
status.networkId,
ChainWeight.totalDifficultyOnly(status.totalDifficulty),
status.bestHash,
status.genesisHash
)

def apply(status: ETC64.Status): RemoteStatus =
RemoteStatus(status.protocolVersion, status.networkId, status.chainWeight, status.bestHash, status.genesisHash)
RemoteStatus(
ETC,
status.protocolVersion,
status.networkId,
status.chainWeight,
status.bestHash,
status.genesisHash
)

def apply(status: BaseETH6XMessages.Status): RemoteStatus =
RemoteStatus(
ETH,
status.protocolVersion,
status.networkId,
ChainWeight.totalDifficultyOnly(status.totalDifficulty),
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class PeerActor[R <: HandshakeResult](
case RLPxConnectionHandler.MessageReceived(msg) =>
// Processes the received message, cancels the timeout and processes a new message but only if the handshaker
// handles the received message
log.debug("Message received: {} from peer {}", msg, peerAddress)
handshaker.applyMessage(msg).foreach { newHandshaker =>
timeout.cancel()
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.iohk.ethereum.domain.BlockchainReader
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.ForkResolver
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
import io.iohk.ethereum.network.p2p.messages.Capability
import io.iohk.ethereum.utils.BlockchainConfig
import io.iohk.ethereum.utils.NodeStatus

case class EtcHandshaker private (
Expand Down Expand Up @@ -37,5 +37,5 @@ trait EtcHandshakerConfiguration {
val appStateStorage: AppStateStorage
val peerConfiguration: PeerConfiguration
val forkResolverOpt: Option[ForkResolver]
val capabilities: List[Capability]
val blockchainConfig: BlockchainConfig
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ case class EtcHelloExchangeState(handshakerConfiguration: EtcHandshakerConfigura
override def applyResponseMessage: PartialFunction[Message, HandshakerState[PeerInfo]] = { case hello: Hello =>
log.debug("Protocol handshake finished with peer ({})", hello)
// FIXME in principle this should be already negotiated
Capability.negotiate(hello.capabilities.toList, handshakerConfiguration.capabilities) match {
case Some(ProtocolVersions.ETC64) => EtcNodeStatus64ExchangeState(handshakerConfiguration)
case Some(ProtocolVersions.ETH63) => EtcNodeStatus63ExchangeState(handshakerConfiguration)
Capability.negotiate(hello.capabilities.toList, handshakerConfiguration.blockchainConfig.capabilities) match {
case Some(ProtocolVersions.ETC64) =>
log.debug("Negotiated protocol version with client {} is etc/64", hello.clientId)
EtcNodeStatus64ExchangeState(handshakerConfiguration)
case Some(ProtocolVersions.ETH63) =>
log.debug("Negotiated protocol version with client {} is eth/63", hello.clientId)
EthNodeStatus63ExchangeState(handshakerConfiguration)
case Some(ProtocolVersions.ETH64) =>
log.debug("Negotiated protocol version with client {} is eth/64", hello.clientId)
EthNodeStatus64ExchangeState(handshakerConfiguration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also fix the 'orElse' clause that I introduced (orElse(WireDecoder))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to do this in a follow up PR, this one has become quite old and heavy

case _ =>
log.debug(
s"Connected peer does not support {} / {} protocol. Disconnecting.",
s"Connected peer does not support {} / {} / {} protocol. Disconnecting.",
ProtocolVersions.ETH63,
ProtocolVersions.ETH64,
ProtocolVersions.ETC64
)
DisconnectedState(Disconnect.Reasons.IncompatibleP2pProtocolVersion)
Expand All @@ -57,7 +65,7 @@ case class EtcHelloExchangeState(handshakerConfiguration: EtcHandshakerConfigura
Hello(
p2pVersion = EtcHelloExchangeState.P2pVersion,
clientId = Config.clientId,
capabilities = handshakerConfiguration.capabilities,
capabilities = handshakerConfiguration.blockchainConfig.capabilities,
listenPort = listenPort,
nodeId = ByteString(nodeStatus.nodeId)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.iohk.ethereum.network.p2p.MessageSerializable
import io.iohk.ethereum.network.p2p.messages.BaseETH6XMessages
import io.iohk.ethereum.network.p2p.messages.ProtocolVersions

case class EtcNodeStatus63ExchangeState(
case class EthNodeStatus63ExchangeState(
handshakerConfiguration: EtcHandshakerConfiguration
) extends EtcNodeStatusExchangeState[BaseETH6XMessages.Status] {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.iohk.ethereum.network.handshaker

import io.iohk.ethereum.forkid.ForkId
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.EtcPeerManagerActor.RemoteStatus
import io.iohk.ethereum.network.p2p.Message
import io.iohk.ethereum.network.p2p.MessageSerializable
import io.iohk.ethereum.network.p2p.messages.ETH64
import io.iohk.ethereum.network.p2p.messages.ProtocolVersions

case class EthNodeStatus64ExchangeState(
handshakerConfiguration: EtcHandshakerConfiguration
) extends EtcNodeStatusExchangeState[ETH64.Status] {

import handshakerConfiguration._

def applyResponseMessage: PartialFunction[Message, HandshakerState[PeerInfo]] = { case status: ETH64.Status =>
// TODO: validate fork id of the remote peer
applyRemoteStatusMessage(RemoteStatus(status))
}

override protected def createStatusMsg(): MessageSerializable = {
val bestBlockHeader = getBestBlockHeader()
val chainWeight = blockchain.getChainWeightByHash(bestBlockHeader.hash).get
val genesisHash = blockchainReader.genesisHeader.hash

val status = ETH64.Status(
protocolVersion = ProtocolVersions.ETH64.version,
networkId = peerConfiguration.networkId,
totalDifficulty = chainWeight.totalDifficulty,
bestHash = bestBlockHeader.hash,
genesisHash = genesisHash,
forkId = ForkId.create(genesisHash, blockchainConfig)(blockchainReader.getBestBlockNumber())
)

log.debug(s"Sending status $status")
status
}

}
30 changes: 27 additions & 3 deletions src/main/scala/io/iohk/ethereum/network/p2p/MessageDecoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object NetworkMessageDecoder extends MessageDecoder {
case Ping.code => payload.toPing
case Pong.code => payload.toPong
case Hello.code => payload.toHello
case _ => throw new RuntimeException(s"Unknown message type: $msgCode")
case _ => throw new RuntimeException(s"Unknown network message type: $msgCode")
}

}
Expand All @@ -51,7 +51,30 @@ object ETC64MessageDecoder extends MessageDecoder {
case Codes.BlockBodiesCode => payload.toBlockBodies
case Codes.BlockHashesFromNumberCode => payload.toBlockHashesFromNumber
case Codes.SignedTransactionsCode => payload.toSignedTransactions
case _ => throw new RuntimeException(s"Unknown message type: $msgCode")
case _ => throw new RuntimeException(s"Unknown etc/64 message type: $msgCode")
}
}

object ETH64MessageDecoder extends MessageDecoder {
import io.iohk.ethereum.network.p2p.messages.ETH64.Status._
import io.iohk.ethereum.network.p2p.messages.BaseETH6XMessages.NewBlock._

def fromBytes(msgCode: Int, payload: Array[Byte]): Message =
msgCode match {
case Codes.GetNodeDataCode => payload.toGetNodeData
case Codes.NodeDataCode => payload.toNodeData
case Codes.GetReceiptsCode => payload.toGetReceipts
case Codes.ReceiptsCode => payload.toReceipts
case Codes.NewBlockHashesCode => payload.toNewBlockHashes
case Codes.GetBlockHeadersCode => payload.toGetBlockHeaders
case Codes.BlockHeadersCode => payload.toBlockHeaders
case Codes.GetBlockBodiesCode => payload.toGetBlockBodies
case Codes.BlockBodiesCode => payload.toBlockBodies
case Codes.BlockHashesFromNumberCode => payload.toBlockHashesFromNumber
case Codes.StatusCode => payload.toStatus
case Codes.NewBlockCode => payload.toNewBlock
case Codes.SignedTransactionsCode => payload.toSignedTransactions
case _ => throw new RuntimeException(s"Unknown eth/64 message type: $msgCode")
}
}

Expand All @@ -74,7 +97,7 @@ object ETH63MessageDecoder extends MessageDecoder {
case Codes.StatusCode => payload.toStatus
case Codes.NewBlockCode => payload.toNewBlock
case Codes.SignedTransactionsCode => payload.toSignedTransactions
case _ => throw new RuntimeException(s"Unknown message type: $msgCode")
case _ => throw new RuntimeException(s"Unknown eth/63 message type: $msgCode")
}
}

Expand All @@ -85,6 +108,7 @@ object EthereumMessageDecoder {
protocolVersion match {
case Capability.Capabilities.Etc64Capability => ETC64MessageDecoder.fromBytes
case Capability.Capabilities.Eth63Capability => ETH63MessageDecoder.fromBytes
case Capability.Capabilities.Eth64Capability => ETH64MessageDecoder.fromBytes
case _ => throw new RuntimeException(s"Unsupported Protocol Version $protocolVersion")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ object Capability {

object Capabilities {
val Eth63Capability: Capability = ProtocolVersions.ETH63
val Eth64Capability: Capability = ProtocolVersions.ETH64
val Etc64Capability: Capability = ProtocolVersions.ETC64

val All: Seq[Capability] = Seq(ProtocolVersions.ETC64, ProtocolVersions.ETH63)
val All: Seq[Capability] = Seq(ProtocolVersions.ETC64, ProtocolVersions.ETH63, ProtocolVersions.ETH64)
}
}
76 changes: 76 additions & 0 deletions src/main/scala/io/iohk/ethereum/network/p2p/messages/ETH64.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.iohk.ethereum.network.p2p.messages

import akka.util.ByteString

import org.bouncycastle.util.encoders.Hex

import io.iohk.ethereum.forkid.ForkId
import io.iohk.ethereum.forkid.ForkId._
import io.iohk.ethereum.network.p2p.Message
import io.iohk.ethereum.network.p2p.MessageSerializableImplicit
import io.iohk.ethereum.rlp.RLPImplicitConversions._
import io.iohk.ethereum.rlp.RLPImplicits._
import io.iohk.ethereum.rlp._

object ETH64 {

case class Status(
protocolVersion: Int,
networkId: Int,
totalDifficulty: BigInt,
bestHash: ByteString,
genesisHash: ByteString,
forkId: ForkId
) extends Message {

override def toString: String =
s"Status { " +
s"code: $code, " +
s"protocolVersion: $protocolVersion, " +
s"networkId: $networkId, " +
s"totalDifficulty: $totalDifficulty, " +
s"bestHash: ${Hex.toHexString(bestHash.toArray[Byte])}, " +
s"genesisHash: ${Hex.toHexString(genesisHash.toArray[Byte])}," +
s"forkId: $forkId," +
s"}"

override def toShortString: String = toString
override def code: Int = Codes.StatusCode
}

object Status {
implicit class StatusEnc(val underlyingMsg: Status)
extends MessageSerializableImplicit[Status](underlyingMsg)
with RLPSerializable {
override def code: Int = Codes.StatusCode

override def toRLPEncodable: RLPEncodeable = {
import msg._
RLPList(protocolVersion, networkId, totalDifficulty, bestHash, genesisHash, forkId.toRLPEncodable)
}
}

implicit class StatusDec(val bytes: Array[Byte]) extends AnyVal {
def toStatus: Status = rawDecode(bytes) match {
case RLPList(
protocolVersion,
networkId,
totalDifficulty,
bestHash,
genesisHash,
forkId
) =>
Status(
protocolVersion,
networkId,
totalDifficulty,
bestHash,
genesisHash,
decode[ForkId](forkId)
)

case _ => throw new RuntimeException("Cannot decode Status")
}
}
}
}
Loading