Skip to content

Commit

Permalink
ETCM-370: Refine responses validation on block fetcher side and base …
Browse files Browse the repository at this point in the history
…blacklisting on that

Fix tests
  • Loading branch information
AnastasiiaL committed May 28, 2021
1 parent f144be7 commit 7fc7faf
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import io.iohk.ethereum.consensus.{ConsensusConfig, FullConsensusConfig, pow}
import io.iohk.ethereum.crypto
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.nodebuilder.VmSetup
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,16 @@ object Blacklist {
val code: Int = 27
val name: String = "UnrequestedBodies"
}
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
case object UnrequestedHeadersType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 28
val name: String = "UnrequestedHeaders"
}
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 29
val name: String = "RegularSyncRequestFailed"
}
case object BlockImportErrorType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 29
val code: Int = 30
val name: String = "BlockImportError"
}
}
Expand Down Expand Up @@ -281,6 +285,10 @@ object Blacklist {
val reasonType: BlacklistReasonType = UnrequestedBodiesType
val description: String = "Received unrequested bodies"
}
case object UnrequestedHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = UnrequestedHeadersType
val description: String = "Received unrequested headers"
}
final case class RegularSyncRequestFailed(error: String) extends BlacklistReason {
val reasonType: BlacklistReasonType = RegularSyncRequestFailedType
val description: String = s"Request failed with error: $error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.blockchain.sync.PeersClient._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
AwaitingHeadersToBeIgnored,
HeadersNotFormingSeq,
HeadersNotMatchingReadyBlocks,
HeadersNotMatchingWaitingHeaders
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
Expand Down Expand Up @@ -130,7 +133,7 @@ class BlockFetcher(
blockProvider.foreach(peersClient ! BlacklistPeer(_, BlacklistReason.BlockImportError(reason)))
fetchBlocks(newState)

case ReceivedHeaders(headers) if state.isFetchingHeaders =>
case ReceivedHeaders(peer, headers) if state.isFetchingHeaders =>
//First successful fetch
if (state.waitingHeaders.isEmpty) {
supervisor ! ProgressProtocol.StartedFetching
Expand All @@ -146,6 +149,14 @@ class BlockFetcher(
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
state.appendHeaders(headers) match {
case Left(HeadersNotFormingSeq) =>
log.info("Dismissed received headers due to: {}", HeadersNotFormingSeq.description)
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
state.withHeaderFetchReceived
case Left(HeadersNotMatchingReadyBlocks) =>
log.info("Dismissed received headers due to: {}", HeadersNotMatchingReadyBlocks.description)
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
state.withHeaderFetchReceived
case Left(err) =>
log.info("Dismissed received headers due to: {}", err)
state.withHeaderFetchReceived
Expand All @@ -154,6 +165,11 @@ class BlockFetcher(
}
}
fetchBlocks(newState)

case ReceivedHeaders(peer, _) if !state.isFetchingHeaders =>
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
Behaviors.same

case RetryHeadersRequest if state.isFetchingHeaders =>
log.debug("Something failed on a headers request, cancelling the request and re-fetching")
fetchBlocks(state.withHeaderFetchReceived)
Expand All @@ -177,6 +193,10 @@ class BlockFetcher(
fetchBlocks(newState)
}

case ReceivedBodies(peer, _) if !state.isFetchingBodies =>
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedBodies)
Behaviors.same

case RetryBodiesRequest if state.isFetchingBodies =>
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
fetchBlocks(state.withBodiesFetchReceived)
Expand Down Expand Up @@ -332,7 +352,7 @@ object BlockFetcher {
final case object RetryBodiesRequest extends FetchCommand
final case object RetryHeadersRequest extends FetchCommand
final case class AdaptedMessageFromEventBus(message: Message, peerId: PeerId) extends FetchCommand
final case class ReceivedHeaders(headers: Seq[BlockHeader]) extends FetchCommand
final case class ReceivedHeaders(peer: Peer, headers: Seq[BlockHeader]) extends FetchCommand
final case class ReceivedBodies(peer: Peer, bodies: Seq[BlockBody]) extends FetchCommand

sealed trait FetchResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import io.iohk.ethereum.utils.ByteStringUtils

import scala.annotation.tailrec
import scala.collection.immutable.Queue
Expand Down Expand Up @@ -75,7 +74,7 @@ case class BlockFetcherState(

def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
def appendHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, BlockFetcherState] =
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
val lastNumber = HeadersSeq.lastNumber(validHeaders)
withPossibleNewTopAt(lastNumber)
Expand All @@ -86,17 +85,16 @@ case class BlockFetcherState(

/**
* Validates received headers consistency and their compatibility with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
*/
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
private def validatedHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, Seq[BlockHeader]] =
if (headers.isEmpty) {
Right(headers)
} else {
headers
.asRight[String]
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
.ensure("Given headers do not form a chain with already stored ones")(headers =>
.asRight[ValidationErrors]
.ensure(HeadersNotFormingSeq)(HeadersSeq.areChain)
.ensure(HeadersNotMatchingReadyBlocks)(checkConsistencyWithReadyBlocks)
.ensure(HeadersNotMatchingWaitingHeaders)(headers =>
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
)
}
Expand Down Expand Up @@ -319,4 +317,17 @@ object BlockFetcherState {
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState

sealed trait ValidationErrors {
def description: String
}
case object HeadersNotFormingSeq extends ValidationErrors {
val description = "Given headers should form a sequence without gaps"
}
case object HeadersNotMatchingReadyBlocks extends ValidationErrors {
val description = "Given headers should form a sequence with ready blocks"
}
case object HeadersNotMatchingWaitingHeaders extends ValidationErrors {
val description = "Given headers should form a chain with waiting headers"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class HeadersFetcher(
log.debug("Start fetching headers from block {}", blockNumber)
requestHeaders(blockNumber, amount)
Behaviors.same
case AdaptedMessage(_, BlockHeaders(headers)) =>
case AdaptedMessage(peer, BlockHeaders(headers)) =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
supervisor ! BlockFetcher.ReceivedHeaders(headers)
supervisor ! BlockFetcher.ReceivedHeaders(peer, headers)
Behaviors.same
case HeadersFetcher.RetryHeadersRequest =>
supervisor ! BlockFetcher.RetryHeadersRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
import akka.actor.typed.{ActorRef => TypedActorRef}
import io.iohk.ethereum.blockchain.sync.SyncProtocol
import io.iohk.ethereum.blockchain.sync.{Blacklist, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.HeadersNotMatchingReadyBlocks
import io.iohk.ethereum.{BlockHelpers, WithActorSystemShutDown}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.utils.ByteStringUtils
Expand Down Expand Up @@ -69,7 +70,7 @@ class BlockFetcherStateSpec
.appendHeaders(blocks.map(_.header))
.map(_.handleRequestedBlocks(blocks, peer))

assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks"))
assert(result.map(_.waitingHeaders) === Left(HeadersNotMatchingReadyBlocks))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, typed}
import akka.actor.typed.{ActorRef => TypedActorRef}
import akka.testkit.TestActor.AutoPilot
import akka.testkit.TestKit
import akka.testkit.{TestKit, TestProbe}
import akka.util.ByteString
import cats.data.NonEmptyList
import cats.effect.Resource
import cats.syntax.traverse._
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.Start
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.crypto.kec256
Expand Down Expand Up @@ -37,6 +39,7 @@ import org.scalatest.{Assertion, BeforeAndAfterEach}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.math.BigInt

class RegularSyncSpec
extends WordSpecBase
Expand Down Expand Up @@ -112,29 +115,101 @@ class RegularSyncSpec
peersClient.expectMsg(PeersClient.BlacklistPeer(defaultPeer.id, BlacklistReason.RegularSyncRequestFailed("a random reason")))
})

//TODO: To be re-enabled with ETCM-370
"blacklist peer which returns headers starting from one with higher number than expected" ignore sync(
new Fixture(
testSystem
) {
"blacklist peer which returns headers starting from one with higher number than expected" in sync(
new Fixture(testSystem) {
var blockFetcher: ActorRef = _

regularSync ! SyncProtocol.Start
peerEventBus.expectMsgClass(classOf[Subscribe])
blockFetcher = peerEventBus.sender()

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(1).headers)))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)
peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

blockFetcher ! MessageFromPeer(NewBlock(testBlocks.last, ChainWeight.totalDifficultyOnly(testBlocks.last.number)), defaultPeer.id)
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(5).headers)))
peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
}
)

//TODO: To be re-enabled with ETCM-370
"blacklist peer which returns headers not forming a chain" ignore sync(new Fixture(testSystem) {
"blacklist peer which returns headers not forming a chain" in sync(new Fixture(testSystem) {
regularSync ! SyncProtocol.Start

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(
PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers.filter(_.number % 2 == 0)))
PeersClient.Response(defaultPeer, BlockHeaders(testBlocks.headers.filter(_.number % 2 == 0)))
)
peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
})

"blacklist peer which sends headers that were not requested" in sync(new Fixture(testSystem) {
import akka.actor.typed.scaladsl.adapter._

val blockImporter = TestProbe()
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

fetcher ! Start(blockImporter.ref, 0)

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)

peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

fetcher ! BlockFetcher.ReceivedHeaders(defaultPeer, testBlocksChunked(3).headers)

peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
})

"blacklist peer which sends bodies that were not requested" in sync(new Fixture(testSystem) {
import akka.actor.typed.scaladsl.adapter._

var blockFetcherAdapter: TypedActorRef[MessageFromPeer] = _
val blockImporter = TestProbe()
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

fetcher ! Start(blockImporter.ref, 0)

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)

peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

fetcher ! BlockFetcher.ReceivedBodies(defaultPeer, testBlocksChunked(3).bodies)

peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
Expand Down

0 comments on commit 7fc7faf

Please sign in to comment.