diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala index 71a6afd833..ad1a422ff3 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala @@ -16,6 +16,7 @@ import io.iohk.ethereum.network.Peer import io.iohk.ethereum.network.p2p.messages.PV62._ import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._ import io.iohk.ethereum.network.p2p.messages.PV63._ +import io.iohk.ethereum.utils.ByteStringUtils import io.iohk.ethereum.utils.Config.SyncConfig import org.bouncycastle.util.encoders.Hex @@ -360,35 +361,36 @@ class FastSync( } private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]) = { - validateReceipts(requestedHashes, receipts) match { - case ReceiptsValidationResult.Valid(blockHashesWithReceipts) => - blockHashesWithReceipts.map { case (hash, receiptsForBlock) => - blockchain.storeReceipts(hash, receiptsForBlock) - }.reduce(_.and(_)) - .commit() - - val receivedHashes = blockHashesWithReceipts.unzip._1 - updateBestBlockIfNeeded(receivedHashes) - - if (receipts.isEmpty) { - val reason = s"got empty receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" - blacklist(peer.id, blacklistDuration, reason) - } - - val remainingReceipts = requestedHashes.drop(receipts.size) - if (remainingReceipts.nonEmpty) { - syncState = syncState.enqueueReceipts(remainingReceipts) - } + if (receipts.isEmpty) { + val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" + blacklist(peer.id, blacklistDuration, reason) + syncState = syncState.enqueueReceipts(requestedHashes) + } else { + validateReceipts(requestedHashes, receipts) match { + case ReceiptsValidationResult.Valid(blockHashesWithReceipts) => + blockHashesWithReceipts.map { case (hash, receiptsForBlock) => + blockchain.storeReceipts(hash, receiptsForBlock) + }.reduce(_.and(_)) + .commit() + + val receivedHashes = blockHashesWithReceipts.unzip._1 + updateBestBlockIfNeeded(receivedHashes) + + val remainingReceipts = requestedHashes.drop(receipts.size) + if (remainingReceipts.nonEmpty) { + syncState = syncState.enqueueReceipts(remainingReceipts) + } - case ReceiptsValidationResult.Invalid(error) => - val reason = - s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + - s" due to: $error" - blacklist(peer.id, blacklistDuration, reason) - syncState = syncState.enqueueReceipts(requestedHashes) + case ReceiptsValidationResult.Invalid(error) => + val reason = + s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + + s" due to: $error" + blacklist(peer.id, blacklistDuration, reason) + syncState = syncState.enqueueReceipts(requestedHashes) - case ReceiptsValidationResult.DbError => - redownloadBlockchain() + case ReceiptsValidationResult.DbError => + redownloadBlockchain() + } } processSyncing() diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala index 85238fba01..c5ef65d09a 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala @@ -145,6 +145,68 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id)))) } + it should "gracefully handle receiving empty receipts while syncing" in new TestSetup() { + + val newSafeTarget = defaultExpectedTargetBlock + syncConfig.fastSyncBlockValidationX + val bestBlockNumber = defaultExpectedTargetBlock + val firstNewBlock = bestBlockNumber + 1 + + startWithState(defaultState.copy( + bestBlockHeaderNumber = bestBlockNumber, + safeDownloadTarget = newSafeTarget) + ) + + Thread.sleep(1.seconds.toMillis) + + syncController ! SyncController.Start + + val handshakedPeers = HandshakedPeers(singlePeer) + updateHandshakedPeers(handshakedPeers) + etcPeerManager.setAutoPilot(new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = { + if (msg == EtcPeerManagerActor.GetHandshakedPeers) { + sender ! handshakedPeers + } + + this + } + }) + + val watcher = TestProbe() + watcher.watch(syncController) + + val newBlocks = getHeaders(firstNewBlock, syncConfig.blockHeadersPerRequest) + val newReceipts = newBlocks.map(_.hash).map(_ => Seq.empty[Receipt]) + val newBodies = newBlocks.map(_ => BlockBody.empty) + + //wait for peers throttle + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendBlockHeaders(firstNewBlock, newBlocks, peer1, newBlocks.size) + + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendNewTargetBlock(defaultTargetBlockHeader.copy(number = defaultTargetBlockHeader.number + 1), peer1, peer1Status, handshakedPeers) + + Thread.sleep(1.second.toMillis) + sendReceipts(newBlocks.map(_.hash), Seq(), peer1) + + // Peer will be blacklisted for empty response, so wait he is blacklisted + Thread.sleep(2.second.toMillis) + sendReceipts(newBlocks.map(_.hash), newReceipts, peer1) + + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1) + + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendNodes(Seq(defaultTargetBlockHeader.stateRoot), Seq(defaultStateMptLeafWithAccount), peer1) + + //switch to regular download + etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage( + GetBlockHeaders(Left(defaultTargetBlockHeader.number + 1), syncConfig.blockHeadersPerRequest, 0, reverse = false), + peer1.id)) + peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id)))) + } + + it should "handle blocks that fail validation" in new TestSetup(_validators = new Mocks.MockValidatorsAlwaysSucceed { override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, getBlockHeaderByHash) => Left(HeaderPoWError) } }) { @@ -444,11 +506,12 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peerMessageBus.expectMsg(Unsubscribe()) // response timeout - Thread.sleep(2.seconds.toMillis) - etcPeerManager.expectNoMessage() + Thread.sleep(1.seconds.toMillis) + + etcPeerManager.expectNoMessage(1.second) // wait for blacklist timeout - Thread.sleep(6.seconds.toMillis) + Thread.sleep(2.seconds.toMillis) // peer should not be blacklisted anymore etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(GetNodeData(Seq(defaultTargetBlockHeader.stateRoot)), peer1.id)) @@ -577,7 +640,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w minPeersToChooseTargetBlock = 1, peersScanInterval = 500.milliseconds, redownloadMissingStateNodes = false, - fastSyncBlockValidationX = 10 + fastSyncBlockValidationX = 10, + blacklistDuration = 1.second ) lazy val syncController = TestActorRef(Props(new SyncController( diff --git a/src/test/scala/io/iohk/ethereum/consensus/ethash/EthashUtilsSpec.scala b/src/test/scala/io/iohk/ethereum/consensus/ethash/EthashUtilsSpec.scala index d383eb5c3a..1d1d1017a7 100644 --- a/src/test/scala/io/iohk/ethereum/consensus/ethash/EthashUtilsSpec.scala +++ b/src/test/scala/io/iohk/ethereum/consensus/ethash/EthashUtilsSpec.scala @@ -2,18 +2,20 @@ package io.iohk.ethereum.consensus.ethash import akka.util.ByteString import io.iohk.ethereum.crypto.kec256 -import org.scalacheck.Arbitrary +import org.scalacheck.Gen import org.bouncycastle.util.encoders.Hex import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.annotation.tailrec + class EthashUtilsSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyChecks { import io.iohk.ethereum.consensus.ethash.EthashUtils._ "Ethash" should "generate correct hash" in { - forAll(Arbitrary.arbitrary[Long].filter(_ < 15000000)) { blockNumber => + forAll(Gen.choose[Long](0, 15000000L)) { blockNumber => seed(epoch(blockNumber)) shouldBe seedForBlockReference(blockNumber) } } @@ -115,13 +117,15 @@ class EthashUtilsSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyC } def seedForBlockReference(blockNumber: BigInt): ByteString = { - if (blockNumber < EPOCH_LENGTH) { - //wrong version from YP: - //ByteString(kec256(Hex.decode("00" * 32))) - //working version: - ByteString(Hex.decode("00" * 32)) - } else { - kec256(seedForBlockReference(blockNumber - EPOCH_LENGTH)) + @tailrec + def go(current: BigInt, currentHash: ByteString): ByteString = { + if (current < EPOCH_LENGTH) { + currentHash + } else { + go(current - EPOCH_LENGTH, kec256(currentHash)) + } } + + go(blockNumber, ByteString(Hex.decode("00" * 32))) } }