Skip to content

Commit

Permalink
[ETCM-178] Disallow repeated connections and connections to self
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Tallar committed Oct 29, 2020
1 parent 663f0b8 commit ef84bec
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ class DumpChainActor(
val account = n.value.toArray[Byte].toAccount

if (account.codeHash != DumpChainActor.emptyEvm) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
evmTorequest = evmTorequest :+ account.codeHash
evmCodeHashes = evmCodeHashes + account.codeHash
}
}
if (account.storageRoot != DumpChainActor.emptyStorage) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
contractChildren = contractChildren :+ account.storageRoot
contractNodesHashes = contractNodesHashes + account.storageRoot
}
Expand Down
72 changes: 72 additions & 0 deletions src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.iohk.ethereum.network

import java.net.InetSocketAddress

import akka.actor.ActorRef
import akka.util.ByteString

case class ConnectedPeers(
private val incomingPendingPeers: Map[PeerId, Peer],
private val outgoingPendingPeers: Map[PeerId, Peer],
private val handshakedPeers: Map[PeerId, Peer]
) {

// FIXME: Kept only for compatibility purposes, should eventually be removed
lazy val peers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers

private lazy val allPeers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers ++ incomingPendingPeers

def isConnectionHandled(remoteAddress: InetSocketAddress): Boolean =
allPeers.values.map(_.remoteAddress).toSet.contains(remoteAddress)

/*
We have the node id of our outgoing pending peers so we could use that in our checks, by rejecting a peer that
handshaked to us with the same node id.
However, with checking the node id of only handshaked peers we prioritize handshaked peers over pending ones,
in the above mentioned case the repeated pending peer connection will eventually die out
*/
def hasHandshakedWith(nodeId: ByteString): Boolean =
handshakedPeers.values.flatMap(_.nodeId).toSet.contains(nodeId)

lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size
lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }

lazy val handshakedPeersCount: Int = handshakedPeers.size
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeers.size

def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId)

def addNewPendingPeer(pendingPeer: Peer): ConnectedPeers = {
if (pendingPeer.incomingConnection)
copy(incomingPendingPeers = incomingPendingPeers + (pendingPeer.id -> pendingPeer))
else
copy(outgoingPendingPeers = outgoingPendingPeers + (pendingPeer.id -> pendingPeer))
}

def promotePeerToHandshaked(peerAfterHandshake: Peer): ConnectedPeers = {
if (peerAfterHandshake.incomingConnection)
copy(
incomingPendingPeers = incomingPendingPeers - peerAfterHandshake.id,
handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake)
)
else
copy(
outgoingPendingPeers = outgoingPendingPeers - peerAfterHandshake.id,
handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake)
)
}

def removeTerminatedPeer(peerRef: ActorRef): (Iterable[PeerId], ConnectedPeers) = {
val peersId = allPeers.collect { case (id, peer) if peer.ref == peerRef => id }

(
peersId,
ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId)
)
}
}

object ConnectedPeers {
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ package io.iohk.ethereum.network
import java.net.InetSocketAddress

import akka.actor.ActorRef
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId

case class PeerId(value: String) extends BlackListId

case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) {
object PeerId {
def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name)
}

case class Peer(
remoteAddress: InetSocketAddress,
ref: ActorRef,
incomingConnection: Boolean,
nodeId: Option[ByteString] = None
) {
// FIXME PeerId should be actual peerId i.e id derived form node public key
def id: PeerId = PeerId(ref.path.name)
def id: PeerId = PeerId.fromRef(ref)
}
19 changes: 10 additions & 9 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult](

def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler

val peerId: PeerId = PeerId(self.path.name)

val peer: Peer = Peer(peerAddress, self, incomingConnection)
val peerId: PeerId = PeerId.fromRef(self)

override def receive: Receive = waitingForInitialCommand

Expand Down Expand Up @@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult](
case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) =>
val newUri =
rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)
processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries)

case RLPxConnectionHandler.ConnectionFailed =>
log.debug("Failed to establish RLPx connection")
Expand All @@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult](

def processingHandshaking(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
timeout: Cancellable,
numRetries: Int
Expand All @@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult](
// handles the received message
handshaker.applyMessage(msg).foreach { newHandshaker =>
timeout.cancel()
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
}
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))

case ResponseTimeout =>
timeout.cancel()
val newHandshaker = handshaker.processTimeout
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)

case GetStatus => sender() ! StatusResponse(Handshaking(numRetries))

Expand All @@ -145,18 +144,19 @@ class PeerActor[R <: HandshakeResult](
*/
private def processHandshakerNextMessage(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
numRetries: Int
): Unit =
handshaker.nextMessage match {
case Right(NextMessage(msgToSend, timeoutTime)) =>
rlpxConnection.sendMessage(msgToSend)
val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout)
context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)
context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries)

case Left(HandshakeSuccess(handshakeResult)) =>
rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) }
context become new HandshakedPeer(rlpxConnection, handshakeResult).receive
context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive
unstashAll()

case Left(HandshakeFailure(reason)) =>
Expand Down Expand Up @@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult](
stash()
}

class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) {
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {

val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))

/**
Expand Down
Loading

0 comments on commit ef84bec

Please sign in to comment.