Skip to content

Commit

Permalink
fix: keep first cons time of current block in state (#10404)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Tinker <michael.tinker@swirldslabs.com>
  • Loading branch information
tinker-michaelj authored Dec 12, 2023
1 parent 732eaa4 commit cf6e9e0
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 87 deletions.
2 changes: 1 addition & 1 deletion hedera-node/hapi/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ plugins {

description = "Hedera API"

val hapiProtoBranchOrTag = "add-pbj-types-for-state"
val hapiProtoBranchOrTag = "add-first-cons-of-current-block-to-info"
val hederaProtoDir = layout.projectDirectory.dir("hedera-protobufs")

if (!gradle.startParameter.isOffline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.node.app.records;

import com.hedera.hapi.node.base.Timestamp;
import com.hedera.hapi.node.state.blockrecords.BlockInfo;
import com.hedera.hapi.node.state.blockrecords.RunningHashes;
import com.hedera.node.app.records.impl.BlockRecordManagerImpl;
Expand Down Expand Up @@ -46,6 +47,8 @@ public final class BlockRecordService implements Service {
/** The original hash, only used at genesis */
private static final Bytes GENESIS_HASH = Bytes.wrap(new byte[48]);

public static final Timestamp EPOCH = new Timestamp(0, 0);

@NonNull
@Override
public String getServiceName() {
Expand All @@ -71,7 +74,7 @@ public void migrate(@NonNull final MigrationContext ctx) {
final var blocksState = ctx.newStates().getSingleton(BLOCK_INFO_STATE_KEY);
final var isGenesis = ctx.previousStates().isEmpty();
if (isGenesis) {
final var blocks = new BlockInfo(0, null, Bytes.EMPTY, null, false);
final var blocks = new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH);
blocksState.put(blocks);
final var runningHashes =
RunningHashes.newBuilder().runningHash(GENESIS_HASH).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.hedera.node.app.records.impl;

import static com.hedera.node.app.records.BlockRecordService.EPOCH;
import static com.hedera.node.app.records.impl.BlockRecordInfoUtils.HASH_SIZE;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.hedera.hapi.node.base.Timestamp;
import com.hedera.hapi.node.state.blockrecords.BlockInfo;
import com.hedera.hapi.node.state.blockrecords.RunningHashes;
Expand Down Expand Up @@ -76,15 +78,9 @@ public final class BlockRecordManagerImpl implements BlockRecordManager {
* time we finish a provisional block.
*/
private BlockInfo lastBlockInfo;
/** The number of the current provisional block. "provisional" because the block is not yet complete. */
private long provisionalCurrentBlockNumber;
/**
* The consensus time of the first transaction in the current block. "provisional" because the block is not yet
* complete.
* True when we have completed event recovery. This is not yet implemented properly.
*/
// TODO : Verify this is safe during reconnect
private Instant provisionalCurrentBlockFirstTransactionTime = null;
/** True when we have completed event recovery. This is not yet implemented properly. */
private boolean eventRecoveryCompleted = false;

/**
Expand Down Expand Up @@ -120,8 +116,6 @@ public BlockRecordManagerImpl(
final var blockInfoState = states.<BlockInfo>getSingleton(BlockRecordService.BLOCK_INFO_STATE_KEY);
this.lastBlockInfo = blockInfoState.get();
assert this.lastBlockInfo != null : "Cannot be null, because this state is created at genesis";
this.provisionalCurrentBlockNumber = lastBlockInfo.lastBlockNumber() + 1; // We know what this will be
this.provisionalCurrentBlockFirstTransactionTime = null; // We do not know what this will be yet

// Initialize the stream file producer. NOTE, if the producer cannot be initialized, and a random exception is
// thrown here, then startup of the node will fail. This is the intended behavior. We MUST be able to produce
Expand All @@ -135,7 +129,9 @@ public BlockRecordManagerImpl(
// =================================================================================================================
// AutoCloseable implementation

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void close() {
try {
Expand All @@ -151,60 +147,79 @@ public void close() {
// =================================================================================================================
// BlockRecordManager implementation

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
public void startUserTransaction(@NonNull final Instant consensusTime, @NonNull final HederaState state) {
// Is this the very first transaction since the node was started?
final var restarted = provisionalCurrentBlockFirstTransactionTime == null;
// We did just restart. So we will need to create a new block, but we don't need to close out the old one.
// This also works for genesis, since the state was prepopulated with a lastBlockInfo.
if (restarted) {
final var lastBlockNo = lastBlockInfo.lastBlockNumber();
provisionalCurrentBlockNumber = lastBlockNo + 1;
provisionalCurrentBlockFirstTransactionTime = consensusTime;
streamFileProducer.switchBlocks(lastBlockNo, provisionalCurrentBlockNumber, consensusTime);
} else {
// Check to see if we are at the boundary between blocks and should create a new one. Each block is covered
// by some period. We'll compute the period of the current provisional block and the period covered by the
// given consensus time, and if they are different, we'll close out the current block and start a new one.
final var currentBlockPeriod = getBlockPeriod(provisionalCurrentBlockFirstTransactionTime);
final var newBlockPeriod = getBlockPeriod(consensusTime);
if (newBlockPeriod > currentBlockPeriod) {
// Compute the state for the newly completed block. The `lastBlockHashBytes` is the running hash after
// the last transaction
final var lastBlockNo = provisionalCurrentBlockNumber;
final var lastBlockFirstTransactionTime = provisionalCurrentBlockFirstTransactionTime;
final var lastBlockHashBytes = streamFileProducer.getRunningHash();
lastBlockInfo =
updateBlockInfo(lastBlockInfo, lastBlockNo, lastBlockFirstTransactionTime, lastBlockHashBytes);

// Update BlockInfo state
final var states = state.createWritableStates(BlockRecordService.NAME);
final var blockInfoState = states.<BlockInfo>getSingleton(BlockRecordService.BLOCK_INFO_STATE_KEY);
blockInfoState.put(lastBlockInfo);

// log end of block if needed
if (logger.isDebugEnabled()) {
logger.debug(
"""
--- BLOCK UPDATE ---
Finished: #{} @ {} with hash {}
Starting: #{} @ {}""",
lastBlockNo,
provisionalCurrentBlockFirstTransactionTime,
new Hash(lastBlockHashBytes.toByteArray(), DigestType.SHA_384),
lastBlockNo + 1,
consensusTime);
}

// close all stream files for end of block and create signature files, then open new block record file
provisionalCurrentBlockNumber = lastBlockNo + 1;
provisionalCurrentBlockFirstTransactionTime = consensusTime;
streamFileProducer.switchBlocks(lastBlockNo, provisionalCurrentBlockNumber, consensusTime);
if (EPOCH.equals(lastBlockInfo.firstConsTimeOfCurrentBlock())) {
// This is the first transaction of the first block, so set both the firstConsTimeOfCurrentBlock
// and the current consensus time to now
final var now = new Timestamp(consensusTime.getEpochSecond(), consensusTime.getNano());
lastBlockInfo = lastBlockInfo
.copyBuilder()
.consTimeOfLastHandledTxn(now)
.firstConsTimeOfCurrentBlock(now)
.build();
persistLastBlockInfo(state);
streamFileProducer.switchBlocks(-1, 0, consensusTime);
return;
}

// Check to see if we are at the boundary between blocks and should create a new one. Each block is covered
// by some period. We'll compute the period of the current provisional block and the period covered by the
// given consensus time, and if they are different, we'll close out the current block and start a new one.
final var currentBlockPeriod = getBlockPeriod(lastBlockInfo.firstConsTimeOfCurrentBlock());
final var newBlockPeriod = getBlockPeriod(consensusTime);
if (newBlockPeriod > currentBlockPeriod) {
// Compute the state for the newly completed block. The `lastBlockHashBytes` is the running hash after
// the last transaction
final var lastBlockHashBytes = streamFileProducer.getRunningHash();
final var justFinishedBlockNumber = lastBlockInfo.lastBlockNumber() + 1;
lastBlockInfo =
infoOfJustFinished(lastBlockInfo, justFinishedBlockNumber, lastBlockHashBytes, consensusTime);

// Update BlockInfo state
persistLastBlockInfo(state);

// log end of block if needed
if (logger.isDebugEnabled()) {
logger.debug(
"""
--- BLOCK UPDATE ---
Finished: #{} (started @ {}) with hash {}
Starting: #{} @ {}""",
justFinishedBlockNumber,
lastBlockInfo.firstConsTimeOfCurrentBlock(),
new Hash(lastBlockHashBytes.toByteArray(), DigestType.SHA_384),
justFinishedBlockNumber + 1,
consensusTime);
}

switchBlocksAt(consensusTime);
}
}

/** {@inheritDoc} */
/**
* We need this to preserve unit test expectations written that assumed a bug in the original implementation,
* in which the first consensus time of the current block was not in state.
*
* @param consensusTime the consensus time at which to switch to the current block
*/
@VisibleForTesting
public void switchBlocksAt(@NonNull final Instant consensusTime) {
streamFileProducer.switchBlocks(
lastBlockInfo.lastBlockNumber(), lastBlockInfo.lastBlockNumber() + 1, consensusTime);
}

private void persistLastBlockInfo(@NonNull final HederaState state) {
final var states = state.createWritableStates(BlockRecordService.NAME);
final var blockInfoState = states.<BlockInfo>getSingleton(BlockRecordService.BLOCK_INFO_STATE_KEY);
blockInfoState.put(lastBlockInfo);
}

/**
* {@inheritDoc}
*/
public void endUserTransaction(
@NonNull final Stream<SingleTransactionRecord> recordStreamItems, @NonNull final HederaState state) {
// check if we need to run event recovery before we can write any new records to stream
Expand All @@ -216,7 +231,9 @@ public void endUserTransaction(
streamFileProducer.writeRecordStreamItems(recordStreamItems);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void endRound(@NonNull final HederaState state) {
// We get the latest running hash from the StreamFileProducer blocking if needed for it to be computed.
Expand All @@ -238,14 +255,18 @@ public void endRound(@NonNull final HederaState state) {
// ========================================================================================================
// Running Hash Getter Methods

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@NonNull
@Override
public Bytes getRunningHash() {
return streamFileProducer.getRunningHash();
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Nullable
@Override
public Bytes getNMinus3RunningHash() {
Expand All @@ -255,34 +276,44 @@ public Bytes getNMinus3RunningHash() {
// ========================================================================================================
// BlockRecordInfo Implementation

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public long lastBlockNo() {
return lastBlockInfo.lastBlockNumber();
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Nullable
@Override
public Instant firstConsTimeOfLastBlock() {
return BlockRecordInfoUtils.firstConsTimeOfLastBlock(lastBlockInfo);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Nullable
@Override
public Bytes lastBlockHash() {
return BlockRecordInfoUtils.lastBlockHash(lastBlockInfo);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Nullable
@Override
public Bytes blockHashByBlockNumber(final long blockNo) {
return BlockRecordInfoUtils.blockHashByBlockNumber(lastBlockInfo, blockNo);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void advanceConsensusClock(@NonNull final Instant consensusTime, @NonNull final HederaState state) {
final var builder = this.lastBlockInfo
Expand Down Expand Up @@ -323,37 +354,46 @@ private long getBlockPeriod(@Nullable final Instant consensusTimestamp) {
return consensusTimestamp.getEpochSecond() / blockPeriodInSeconds;
}

private long getBlockPeriod(@Nullable final Timestamp consensusTimestamp) {
if (consensusTimestamp == null) return 0;
return consensusTimestamp.seconds() / blockPeriodInSeconds;
}

/**
* Create a new updated BlockInfo from existing BlockInfo and new block information. BlockInfo stores block hashes as a single
* byte array, so we need to append or if full shift left and insert new block hash.
*
* @param currentBlockInfo The current block info
* @param newBlockNumber The new block number
* @param blockFirstTransactionTime The new block first transaction time
* @param blockHash The new block hash
* @param lastBlockInfo The current block info
* @param justFinishedBlockNumber The new block number
* @param hashOfJustFinishedBlock The new block hash
*/
private BlockInfo updateBlockInfo(
BlockInfo currentBlockInfo, long newBlockNumber, Instant blockFirstTransactionTime, Bytes blockHash) {
private BlockInfo infoOfJustFinished(
@NonNull final BlockInfo lastBlockInfo,
@NonNull final long justFinishedBlockNumber,
@NonNull final Bytes hashOfJustFinishedBlock,
@NonNull final Instant currentBlockFirstTransactionTime) {
// compute new block hashes bytes
final byte[] blockHashesBytes = currentBlockInfo.blockHashes().toByteArray();
final byte[] blockHashesBytes = lastBlockInfo.blockHashes().toByteArray();
byte[] newBlockHashesBytes;
if (blockHashesBytes.length < numBlockHashesToKeepBytes) {
// append new hash bytes to end
newBlockHashesBytes = new byte[blockHashesBytes.length + HASH_SIZE];
System.arraycopy(blockHashesBytes, 0, newBlockHashesBytes, 0, blockHashesBytes.length);
blockHash.getBytes(0, newBlockHashesBytes, newBlockHashesBytes.length - HASH_SIZE, HASH_SIZE);
hashOfJustFinishedBlock.getBytes(0, newBlockHashesBytes, newBlockHashesBytes.length - HASH_SIZE, HASH_SIZE);
} else {
// shift bytes left by HASH_SIZE and then set new hash bytes to at end HASH_SIZE bytes
newBlockHashesBytes = blockHashesBytes;
System.arraycopy(
newBlockHashesBytes, HASH_SIZE, newBlockHashesBytes, 0, newBlockHashesBytes.length - HASH_SIZE);
blockHash.getBytes(0, newBlockHashesBytes, newBlockHashesBytes.length - HASH_SIZE, HASH_SIZE);
hashOfJustFinishedBlock.getBytes(0, newBlockHashesBytes, newBlockHashesBytes.length - HASH_SIZE, HASH_SIZE);
}
return new BlockInfo(
newBlockNumber,
new Timestamp(blockFirstTransactionTime.getEpochSecond(), blockFirstTransactionTime.getNano()),
justFinishedBlockNumber,
lastBlockInfo.firstConsTimeOfCurrentBlock(),
Bytes.wrap(newBlockHashesBytes),
lastBlockInfo.consTimeOfLastHandledTxn(),
lastBlockInfo.migrationRecordsStreamed());
lastBlockInfo.migrationRecordsStreamed(),
new Timestamp(
currentBlockFirstTransactionTime.getEpochSecond(), currentBlockFirstTransactionTime.getNano()));
}
}
Loading

0 comments on commit cf6e9e0

Please sign in to comment.