Skip to content

Commit

Permalink
[ETCM-77] checkpoint sync
Browse files Browse the repository at this point in the history
  • Loading branch information
rtkaczyk committed Oct 13, 2020
1 parent 946b711 commit 4cbeed0
Show file tree
Hide file tree
Showing 23 changed files with 1,038 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.iohk.ethereum.blockchain.sync

import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props, Scheduler}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.consensus.validators.Validators
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain.Blockchain
Expand All @@ -16,11 +17,12 @@ class SyncController(
validators: Validators,
peerEventBus: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
syncConfig: SyncConfig,
externalSchedulerOpt: Option[Scheduler] = None)
extends Actor
externalSchedulerOpt: Option[Scheduler] = None
) extends Actor
with ActorLogging {

import SyncController._
Expand All @@ -29,8 +31,8 @@ class SyncController(

override def receive: Receive = idle

def idle: Receive = {
case Start => start()
def idle: Receive = { case Start =>
start()
}

def runningFastSync(fastSync: ActorRef): Receive = {
Expand All @@ -41,8 +43,8 @@ class SyncController(
case other => fastSync.forward(other)
}

def runningRegularSync(regularSync: ActorRef): Receive = {
case other => regularSync.forward(other)
def runningRegularSync(regularSync: ActorRef): Receive = { case other =>
regularSync.forward(other)
}

def start(): Unit = {
Expand All @@ -54,15 +56,17 @@ class SyncController(
startFastSync()
case (true, true) =>
log.warning(
s"do-fast-sync is set to $doFastSync but fast sync cannot start because it has already been completed")
s"do-fast-sync is set to $doFastSync but fast sync cannot start because it has already been completed"
)
startRegularSync()
case (true, false) =>
startRegularSync()
case (false, false) =>
//Check whether fast sync was started before
if (fastSyncStateStorage.getSyncState().isDefined) {
log.warning(
s"do-fast-sync is set to $doFastSync but regular sync cannot start because fast sync hasn't completed")
s"do-fast-sync is set to $doFastSync but regular sync cannot start because fast sync hasn't completed"
)
startFastSync()
} else
startRegularSync()
Expand All @@ -79,14 +83,17 @@ class SyncController(
peerEventBus,
etcPeerManager,
syncConfig,
scheduler),
"fast-sync")
scheduler
),
"fast-sync"
)
fastSync ! FastSync.Start
context become runningFastSync(fastSync)
}

def startRegularSync(): Unit = {
val peersClient = context.actorOf(PeersClient.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "peers-client")
val peersClient =
context.actorOf(PeersClient.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "peers-client")
val regularSync = context.actorOf(
RegularSync.props(
peersClient,
Expand All @@ -97,7 +104,9 @@ class SyncController(
syncConfig,
ommersPool,
pendingTransactionsManager,
scheduler),
checkpointBlockGenerator,
scheduler
),
"regular-sync"
)
regularSync ! RegularSync.Start
Expand All @@ -116,9 +125,11 @@ object SyncController {
validators: Validators,
peerEventBus: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
syncConfig: SyncConfig): Props =
syncConfig: SyncConfig
): Props =
Props(
new SyncController(
appStateStorage,
Expand All @@ -128,9 +139,12 @@ object SyncController {
validators,
peerEventBus,
pendingTransactionsManager,
checkpointBlockGenerator,
ommersPool,
etcPeerManager,
syncConfig))
syncConfig
)
)

case object Start
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@ package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorLogging, ActorRef, NotInfluenceReceiveTimeout, Props, ReceiveTimeout}
import akka.util.ByteString
import cats.data.NonEmptyList
import cats.instances.future._
import cats.instances.list._
import cats.syntax.apply._
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlocks
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain.{Block, Blockchain, SignedTransaction}
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.{ECDSASignature, kec256}
import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction}
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.FunctorOps._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// scalastyle:off cyclomatic.complexity
class BlockImporter(
fetcher: ActorRef,
ledger: Ledger,
blockchain: Blockchain,
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
pendingTransactionsManager: ActorRef
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator
) extends Actor
with ActorLogging {
import BlockImporter._
Expand All @@ -56,15 +62,35 @@ class BlockImporter(

private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse {
case ReceiveTimeout => self ! PickBlocks

case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop)

case BlockFetcher.PickedBlocks(blocks) =>
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
importBlocks(blocks)(state)

case MinedBlock(block) =>
if (!state.importing) {
importMinedBlock(block, state)
}

case nc @ NewCheckpoint(parentHash, signatures) =>
if (state.importing) {
//TODO: is this ok? What delay?
context.system.scheduler.scheduleOnce(100.millis, self, nc)
} else {
ledger.getBlockByHash(parentHash) match {
case Some(parent) =>
val checkpointBlock = checkpointBlockGenerator.generate(parent, Checkpoint(signatures))
importCheckpointBlock(checkpointBlock, state)

case None =>
log.error(s"Could not find parent (${ByteStringUtils.hash2string(parentHash)}) for new checkpoint block")
}
}

case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)

case ImportDone(newBehavior) =>
val newState = state.notImportingBlocks().branchResolved()
val behavior: Behavior = getBehavior(newBehavior)
Expand Down Expand Up @@ -177,6 +203,9 @@ class BlockImporter(
private def importMinedBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)

private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)

private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)

Expand Down Expand Up @@ -294,10 +323,20 @@ object BlockImporter {
syncConfig: SyncConfig,
ommersPool: ActorRef,
broadcaster: ActorRef,
pendingTransactionsManager: ActorRef
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator
): Props =
Props(
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager)
new BlockImporter(
fetcher,
ledger,
blockchain,
syncConfig,
ommersPool,
broadcaster,
pendingTransactionsManager,
checkpointBlockGenerator
)
)

type Behavior = ImporterState => Receive
Expand All @@ -308,6 +347,7 @@ object BlockImporter {
case object OnTop extends ImporterMsg
case object NotOnTop extends ImporterMsg
case class MinedBlock(block: Block) extends ImporterMsg
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
case class ImportDone(newBehavior: NewBehavior) extends ImporterMsg
case object PickBlocks extends ImporterMsg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ class MinedBlockImportMessages(block: Block) extends ImportMessages(block) {
(ErrorLevel, s"Ignoring mined block $exception")
}

class CheckpointBlockImportMessages(block: Block) extends ImportMessages(block) {
import ImportMessages._
override def preImport(): LogEntry = (DebugLevel, s"Importing new checkpoint block (${block.idTag})")
override def importedToTheTop(): LogEntry =
(DebugLevel, s"Added new checkpont block $number to top of the chain")
override def enqueued(): LogEntry = (DebugLevel, s"Checkpoint block $number was added to the queue")
override def duplicated(): LogEntry =
(DebugLevel, "Ignoring duplicate checkpoint block")
override def orphaned(): LogEntry =
(ErrorLevel, "Checkpoint block has no parent. This should never happen")
override def reorganisedChain(newBranch: List[Block]): LogEntry =
(DebugLevel, s"Addition of new checkpoint block $number resulting in chain reorganization")
override def importFailed(error: String): LogEntry =
(WarningLevel, s"Failed to execute checkpoint block because of $error")
override def missingStateNode(exception: MissingNodeException): LogEntry =
(ErrorLevel, s"Ignoring checkpoint block: $exception")
}

class NewBlockImportMessages(block: Block, peerId: PeerId) extends ImportMessages(block) {
import ImportMessages._
override def preImport(): LogEntry = (DebugLevel, s"Handling NewBlock message for block (${block.idTag})")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.BlockBroadcast
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.{Block, Blockchain}
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig

class RegularSync(
Expand All @@ -15,6 +19,7 @@ class RegularSync(
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
scheduler: Scheduler
) extends Actor
with ActorLogging {
Expand All @@ -29,7 +34,16 @@ class RegularSync(
)
val importer: ActorRef =
context.actorOf(
BlockImporter.props(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager),
BlockImporter.props(
fetcher,
ledger,
blockchain,
syncConfig,
ommersPool,
broadcaster,
pendingTransactionsManager,
checkpointBlockGenerator
),
"block-importer"
)

Expand All @@ -55,6 +69,10 @@ class RegularSync(
case MinedBlock(block) =>
log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString)
importer ! BlockImporter.MinedBlock(block)

case NewCheckpoint(parentHash, signatures) =>
log.info(s"Received new checkpoint for block ${ByteStringUtils.hash2string(parentHash)}")
importer ! BlockImporter.NewCheckpoint(parentHash, signatures)
}

override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy()(SupervisorStrategy.defaultDecider)
Expand All @@ -76,6 +94,7 @@ object RegularSync {
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
checkpointBlockGenerator: CheckpointBlockGenerator,
scheduler: Scheduler
): Props =
Props(
Expand All @@ -88,11 +107,13 @@ object RegularSync {
syncConfig,
ommersPool,
pendingTransactionsManager,
checkpointBlockGenerator,
scheduler
)
)

sealed trait RegularSyncMsg
case object Start extends RegularSyncMsg
case class MinedBlock(block: Block) extends RegularSyncMsg
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends RegularSyncMsg
}
Loading

0 comments on commit 4cbeed0

Please sign in to comment.