Skip to content

Commit

Permalink
[ETCM-77] checkpoint sync
Browse files Browse the repository at this point in the history
  • Loading branch information
rtkaczyk committed Oct 20, 2020
1 parent 8c4496b commit 70672f2
Show file tree
Hide file tree
Showing 27 changed files with 634 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.iohk.ethereum.blockchain.sync

import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props, Scheduler}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain.Blockchain
Expand All @@ -16,6 +17,7 @@ class SyncController(
validators: Validators,
peerEventBus: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
syncConfig: SyncConfig,
Expand Down Expand Up @@ -102,6 +104,7 @@ class SyncController(
syncConfig,
ommersPool,
pendingTransactionsManager,
checkpointBlockGenerator,
scheduler
),
"regular-sync"
Expand All @@ -122,6 +125,7 @@ object SyncController {
validators: Validators,
peerEventBus: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
syncConfig: SyncConfig
Expand All @@ -135,6 +139,7 @@ object SyncController {
validators,
peerEventBus,
pendingTransactionsManager,
checkpointBlockGenerator,
ommersPool,
etcPeerManager,
syncConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@ package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorLogging, ActorRef, NotInfluenceReceiveTimeout, Props, ReceiveTimeout}
import akka.util.ByteString
import cats.data.NonEmptyList
import cats.instances.future._
import cats.instances.list._
import cats.syntax.apply._
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlocks
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain.{Block, Blockchain, SignedTransaction}
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.{ECDSASignature, kec256}
import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction}
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.FunctorOps._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// scalastyle:off cyclomatic.complexity
class BlockImporter(
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
pendingTransactionsManager: ActorRef
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator
) extends Actor
with ActorLogging {
import BlockImporter._
Expand All @@ -56,15 +62,35 @@ class BlockImporter(

private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse {
case ReceiveTimeout => self ! PickBlocks

case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop)

case BlockFetcher.PickedBlocks(blocks) =>
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
importBlocks(blocks)(state)

case MinedBlock(block) =>
if (!state.importing) {
importMinedBlock(block, state)
}

case nc @ NewCheckpoint(parentHash, signatures) =>
if (state.importing) {
//We don't want to lose a checkpoint
context.system.scheduler.scheduleOnce(1.second, self, nc)
} else {
ledger.getBlockByHash(parentHash) match {
case Some(parent) =>
val checkpointBlock = checkpointBlockGenerator.generate(parent, Checkpoint(signatures))
importCheckpointBlock(checkpointBlock, state)

case None =>
log.error(s"Could not find parent (${ByteStringUtils.hash2string(parentHash)}) for new checkpoint block")
}
}

case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)

case ImportDone(newBehavior) =>
val newState = state.notImportingBlocks().branchResolved()
val behavior: Behavior = getBehavior(newBehavior)
Expand Down Expand Up @@ -177,6 +203,9 @@ class BlockImporter(
private def importMinedBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)

private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)

private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)

Expand Down Expand Up @@ -247,7 +276,7 @@ class BlockImporter(

// Either block from which we try resolve branch or list of blocks to be imported
private def resolveBranch(blocks: NonEmptyList[Block]): Either[BigInt, List[Block]] =
ledger.resolveBranch(blocks.map(_.header).toList) match {
ledger.resolveBranch(blocks.map(_.header)) match {
case NewBetterBranch(oldBranch) =>
val transactionsToAdd = oldBranch.flatMap(_.body.transactionList)
pendingTransactionsManager ! PendingTransactionsManager.AddUncheckedTransactions(transactionsToAdd)
Expand Down Expand Up @@ -294,10 +323,20 @@ object BlockImporter {
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
pendingTransactionsManager: ActorRef
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator
): Props =
Props(
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager)
new BlockImporter(
fetcher,
ledger,
blockchain,
syncConfig,
ommersPool,
broadcaster,
pendingTransactionsManager,
checkpointBlockGenerator
)
)

type Behavior = ImporterState => Receive
Expand All @@ -308,6 +347,7 @@ object BlockImporter {
case object OnTop extends ImporterMsg
case object NotOnTop extends ImporterMsg
case class MinedBlock(block: Block) extends ImporterMsg
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportDone(newBehavior: NewBehavior) extends ImporterMsg
case object PickBlocks extends ImporterMsg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ class MinedBlockImportMessages(block: Block) extends ImportMessages(block) {
(ErrorLevel, s"Ignoring mined block $exception")
}

class CheckpointBlockImportMessages(block: Block) extends ImportMessages(block) {
import ImportMessages._
override def preImport(): LogEntry = (DebugLevel, s"Importing new checkpoint block (${block.idTag})")
override def importedToTheTop(): LogEntry =
(DebugLevel, s"Added new checkpoint block $number to top of the chain")
override def enqueued(): LogEntry = (DebugLevel, s"Checkpoint block $number was added to the queue")
override def duplicated(): LogEntry =
(DebugLevel, "Ignoring duplicate checkpoint block")
override def orphaned(): LogEntry =
(ErrorLevel, "Checkpoint block has no parent. This should never happen")
override def reorganisedChain(newBranch: List[Block]): LogEntry =
(DebugLevel, s"Addition of new checkpoint block $number resulting in chain reorganization")
override def importFailed(error: String): LogEntry =
(WarningLevel, s"Failed to execute checkpoint block because of $error")
override def missingStateNode(exception: MissingNodeException): LogEntry =
(ErrorLevel, s"Ignoring checkpoint block: $exception")
}

class NewBlockImportMessages(block: Block, peerId: PeerId) extends ImportMessages(block) {
import ImportMessages._
override def preImport(): LogEntry = (DebugLevel, s"Handling NewBlock message for block (${block.idTag})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.BlockBroadcast
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.{Block, Blockchain}
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig

class RegularSync(
Expand All @@ -17,6 +19,7 @@ class RegularSync(
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
scheduler: Scheduler
) extends Actor
with ActorLogging {
Expand All @@ -31,7 +34,16 @@ class RegularSync(
)
val importer: ActorRef =
context.actorOf(
BlockImporter.props(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager),
BlockImporter.props(
fetcher,
ledger,
blockchain,
syncConfig,
ommersPool,
broadcaster,
pendingTransactionsManager,
checkpointBlockGenerator
),
"block-importer"
)

Expand All @@ -57,6 +69,10 @@ class RegularSync(
case MinedBlock(block) =>
log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString)
importer ! BlockImporter.MinedBlock(block)

case NewCheckpoint(parentHash, signatures) =>
log.info(s"Received new checkpoint for block ${ByteStringUtils.hash2string(parentHash)}")
importer ! BlockImporter.NewCheckpoint(parentHash, signatures)
}

override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy()(SupervisorStrategy.defaultDecider)
Expand All @@ -78,6 +94,7 @@ object RegularSync {
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
scheduler: Scheduler
): Props =
Props(
Expand All @@ -90,6 +107,7 @@ object RegularSync {
syncConfig,
ommersPool,
pendingTransactionsManager,
checkpointBlockGenerator,
scheduler
)
)
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/io/iohk/ethereum/consensus/Consensus.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.iohk.ethereum.consensus

import io.iohk.ethereum.consensus.blocks.{BlockGenerator, TestBlockGenerator}
import io.iohk.ethereum.consensus.difficulty.DifficultyCalculator
import io.iohk.ethereum.consensus.ethash.{MinerProtocol, MinerResponse}
import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.ledger.BlockPreparator
import io.iohk.ethereum.ledger.Ledger.VMImpl
import io.iohk.ethereum.nodebuilder.Node

import scala.concurrent.Future

/**
Expand Down Expand Up @@ -46,6 +48,8 @@ trait Consensus {
*/
def blockGenerator: BlockGenerator

def difficultyCalculator: DifficultyCalculator

/**
* Starts the consensus protocol on the current `node`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@ import io.iohk.ethereum.utils.ByteUtils.or

/**
* This is a skeleton for a generic [[io.iohk.ethereum.consensus.blocks.BlockGenerator BlockGenerator]].
*
* @param blockchain
* @param blockchainConfig
* @param _blockTimestampProvider
*/
abstract class BlockGeneratorSkeleton(
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
consensusConfig: ConsensusConfig,
blockPreparator: BlockPreparator,
difficultyCalc: DifficultyCalculator,
_blockTimestampProvider: BlockTimestampProvider = DefaultBlockTimestampProvider
) extends TestBlockGenerator {

Expand All @@ -42,8 +38,6 @@ abstract class BlockGeneratorSkeleton(

protected def newBlockBody(transactions: Seq[SignedTransaction], x: X): BlockBody

protected def difficulty: DifficultyCalculator

protected def defaultPrepareHeader(
blockNumber: BigInt,
parent: Block,
Expand All @@ -66,7 +60,7 @@ abstract class BlockGeneratorSkeleton(
transactionsRoot = ByteString.empty,
receiptsRoot = ByteString.empty,
logsBloom = ByteString.empty,
difficulty = difficulty.calculateDifficulty(blockNumber, blockTimestamp, parent.header),
difficulty = difficultyCalc.calculateDifficulty(blockNumber, blockTimestamp, parent.header),
number = blockNumber,
gasLimit = calculateGasLimit(parent.header.gasLimit),
gasUsed = 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.iohk.ethereum.consensus.blocks

import akka.util.ByteString
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefPostEcip1097
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger.BloomFilter

class CheckpointBlockGenerator {

def generate(parent: Block, checkpoint: Checkpoint): Block = {
val blockNumber = parent.number + 1
// we are using a predictable value for timestamp so that each federation node generates identical block
// see ETCM-173
val timestamp = parent.header.unixTimestamp + 1

val header = BlockHeader(
parentHash = parent.hash,
ommersHash = BlockHeader.EmptyOmmers,
beneficiary = BlockHeader.EmptyBeneficiary,
difficulty = parent.header.difficulty,
number = blockNumber,
gasLimit = parent.header.gasLimit,
unixTimestamp = timestamp,
extraData = ByteString.empty,
stateRoot = parent.header.stateRoot,
transactionsRoot = BlockHeader.EmptyMpt,
receiptsRoot = BlockHeader.EmptyMpt,
logsBloom = BloomFilter.EmptyBloomFilter,
gasUsed = UInt256.Zero,
mixHash = ByteString.empty,
nonce = ByteString.empty,
extraFields = HefPostEcip1097(false, Some(checkpoint))
)

Block(header, BlockBody.empty)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.iohk.ethereum.consensus.blocks

import io.iohk.ethereum.consensus.ConsensusConfig
import io.iohk.ethereum.consensus.difficulty.DifficultyCalculator
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger.{BlockPreparationError, BlockPreparator}
import io.iohk.ethereum.utils.BlockchainConfig
Expand All @@ -10,12 +11,13 @@ abstract class NoOmmersBlockGenerator(
blockchainConfig: BlockchainConfig,
consensusConfig: ConsensusConfig,
blockPreparator: BlockPreparator,
difficultyCalc: DifficultyCalculator,
blockTimestampProvider: BlockTimestampProvider = DefaultBlockTimestampProvider
) extends BlockGeneratorSkeleton(
blockchain,
blockchainConfig,
consensusConfig,
blockPreparator,
difficultyCalc,
blockTimestampProvider
) {

Expand Down
Loading

0 comments on commit 70672f2

Please sign in to comment.