Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full node changes required for CAT / Standalone #8616

Merged
merged 11 commits into from
Sep 29, 2021
143 changes: 92 additions & 51 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple, Union

from clvm.casts import int_from_bytes

from chia.consensus.block_body_validation import validate_block_body
from chia.consensus.block_header_validation import validate_finished_header_block, validate_unfinished_header_block
from chia.consensus.block_record import BlockRecord
Expand All @@ -18,12 +20,14 @@
from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.hint_store import HintStore
from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary
from chia.types.blockchain_format.vdf import VDFInfo
from chia.types.coin_record import CoinRecord
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.end_of_slot_bundle import EndOfSubSlotBundle
from chia.types.full_block import FullBlock
from chia.types.generator_types import BlockGenerator, GeneratorArg
Expand Down Expand Up @@ -83,12 +87,11 @@ class Blockchain(BlockchainInterface):
# Lock to prevent simultaneous reads and writes
lock: asyncio.Lock
compact_proof_lock: asyncio.Lock
hint_store: HintStore

@staticmethod
async def create(
coin_store: CoinStore,
block_store: BlockStore,
consensus_constants: ConsensusConstants,
coin_store: CoinStore, block_store: BlockStore, consensus_constants: ConsensusConstants, hint_store: HintStore
):
"""
Initializes a blockchain with the BlockRecords from disk, assuming they have all been
Expand All @@ -112,6 +115,7 @@ async def create(
self._shut_down = False
await self._load_chain_from_store()
self._seen_compact_proofs = set()
self.hint_store = hint_store
return self

def shut_down(self):
Expand Down Expand Up @@ -164,7 +168,12 @@ async def receive_block(
block: FullBlock,
pre_validation_result: Optional[PreValidationResult] = None,
fork_point_with_peak: Optional[uint32] = None,
) -> Tuple[ReceiveBlockResult, Optional[Err], Optional[uint32], List[CoinRecord]]:
) -> Tuple[
ReceiveBlockResult,
Optional[Err],
Optional[uint32],
Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this data structure is quite deep. I would think it warrants its own type to add some names and constraints to how it's used.
From the code below it seems like it's coin records that were added or removed, and then hints that were added (but not removed, right?). It doesn't seem necessary to bundle these up in a tuple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a shortcut here, receive_block get's called on 100s of places in tests and if another arg was added in the call I would have to refactor all of those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so maybe a dedicated type for updates would make sense then. then new updates could be added without affecting this signature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this is passing the tests, given that the response is checked in a lot of tests.
I agree with arvid, this response is getting quite complex. Perhaps we can put all of these things into one struct (including the fork_height).

We can leave this for the future, but this is a pretty important method, so it would be good to keep it simple

]:
"""
This method must be called under the blockchain lock
Adds a new block into the blockchain, if it's valid and connected to the current
Expand All @@ -174,18 +183,13 @@ async def receive_block(
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably make sense to extend this comment to describe what the last tuple in the return value is and what it can be expected to contain

genesis: bool = block.height == 0
if self.contains_block(block.header_hash):
return ReceiveBlockResult.ALREADY_HAVE_BLOCK, None, None, []
return ReceiveBlockResult.ALREADY_HAVE_BLOCK, None, None, ([], {})

if not self.contains_block(block.prev_header_hash) and not genesis:
return (
ReceiveBlockResult.DISCONNECTED_BLOCK,
Err.INVALID_PREV_BLOCK_HASH,
None,
[],
)
return (ReceiveBlockResult.DISCONNECTED_BLOCK, Err.INVALID_PREV_BLOCK_HASH, None, ([], {}))

if not genesis and (self.block_record(block.prev_header_hash).height + 1) != block.height:
return ReceiveBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None, []
return ReceiveBlockResult.INVALID_BLOCK, Err.INVALID_HEIGHT, None, ([], {})

npc_result: Optional[NPCResult] = None
if pre_validation_result is None:
Expand All @@ -202,7 +206,7 @@ async def receive_block(
try:
block_generator: Optional[BlockGenerator] = await self.get_block_generator(block)
except ValueError:
return ReceiveBlockResult.INVALID_BLOCK, Err.GENERATOR_REF_HAS_NO_GENERATOR, None, []
return ReceiveBlockResult.INVALID_BLOCK, Err.GENERATOR_REF_HAS_NO_GENERATOR, None, ([], {})
assert block_generator is not None and block.transactions_info is not None
npc_result = get_name_puzzle_conditions(
block_generator,
Expand All @@ -228,7 +232,7 @@ async def receive_block(
)

if error is not None:
return ReceiveBlockResult.INVALID_BLOCK, error.code, None, []
return ReceiveBlockResult.INVALID_BLOCK, error.code, None, ([], {})
else:
npc_result = pre_validation_result.npc_result
required_iters = pre_validation_result.required_iters
Expand All @@ -247,7 +251,7 @@ async def receive_block(
self.get_block_generator,
)
if error_code is not None:
return ReceiveBlockResult.INVALID_BLOCK, error_code, None, []
return ReceiveBlockResult.INVALID_BLOCK, error_code, None, ([], {})

block_record = block_to_block_record(
self.constants,
Expand All @@ -263,7 +267,7 @@ async def receive_block(
# Perform the DB operations to update the state, and rollback if something goes wrong
await self.block_store.db_wrapper.begin_transaction()
await self.block_store.add_full_block(header_hash, block, block_record)
fork_height, peak_height, records, coin_record_change = await self._reconsider_peak(
fork_height, peak_height, records, (coin_record_change, hint_changes) = await self._reconsider_peak(
block_record, genesis, fork_point_with_peak, npc_result
)
await self.block_store.db_wrapper.commit_transaction()
Expand All @@ -286,17 +290,35 @@ async def receive_block(
if fork_height is not None:
# new coin records added
assert coin_record_change is not None
return ReceiveBlockResult.NEW_PEAK, None, fork_height, coin_record_change
return ReceiveBlockResult.NEW_PEAK, None, fork_height, (coin_record_change, hint_changes)
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None, []
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None, ([], {})

def get_hint_list(self, npc_result: NPCResult) -> List[Tuple[bytes32, bytes]]:
h_list = []
for npc in npc_result.npc_list:
for opcode, conditions in npc.conditions:
if opcode == ConditionOpcode.CREATE_COIN:
for condition in conditions:
if len(condition.vars) > 2 and condition.vars[2] != b"":
puzzle_hash, amount_bin = condition.vars[0], condition.vars[1]
amount = int_from_bytes(amount_bin)
coin_id = Coin(npc.coin_name, puzzle_hash, amount).name()
h_list.append((coin_id, condition.vars[2]))
return h_list

async def _reconsider_peak(
self,
block_record: BlockRecord,
genesis: bool,
fork_point_with_peak: Optional[uint32],
npc_result: Optional[NPCResult],
) -> Tuple[Optional[uint32], Optional[uint32], List[BlockRecord], List[CoinRecord]]:
) -> Tuple[
Optional[uint32],
Optional[uint32],
List[BlockRecord],
Tuple[List[CoinRecord], Dict[bytes, Dict[bytes32, CoinRecord]]],
]:
"""
When a new block is added, this is called, to check if the new block is the new peak of the chain.
This also handles reorgs by reverting blocks which are not in the heaviest chain.
Expand All @@ -305,6 +327,8 @@ async def _reconsider_peak(
"""
peak = self.get_peak()
lastest_coin_state: Dict[bytes32, CoinRecord] = {}
hint_coin_state: Dict[bytes32, Dict[bytes32, CoinRecord]] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a comment here describing what these fields are?
I think it's Hint -> (CoinID -> CoinRecord)


if genesis:
if peak is None:
block: Optional[FullBlock] = await self.block_store.get_full_block(block_record.header_hash)
Expand All @@ -326,8 +350,8 @@ async def _reconsider_peak(
else:
added, _ = [], []
await self.block_store.set_peak(block_record.header_hash)
return uint32(0), uint32(0), [block_record], added
return None, None, [], []
return uint32(0), uint32(0), [block_record], (added, {})
return None, None, [], ([], {})

assert peak is not None
if block_record.weight > peak.weight:
Expand Down Expand Up @@ -372,46 +396,63 @@ async def _reconsider_peak(
records_to_add = []
for fetched_full_block, fetched_block_record in reversed(blocks_to_add):
records_to_add.append(fetched_block_record)
if fetched_block_record.is_transaction_block:
if fetched_full_block.is_transaction_block():
if fetched_block_record.header_hash == block_record.header_hash:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this condition similar to npc_result is not None? it's not obvious to me why we pass in npc_result in this case, but pass in None in the other. That's the only difference.

tx_removals, tx_additions = await self.get_tx_removals_and_additions(
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(
fetched_full_block, npc_result
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this call, what's the difference between npc_res and npc_result? Seeing this call to get_tx_removals_and_additions() in the case where we already have npc_result makes me worry a bit. If we really have the npc_resultt we shouldn't be running the generator program again. I might be missing something and this doesn't seem to be something introduced by your patch regardless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't run it again. If the npc_result already exists (it's Optional, and not present when syncing) then we will not run the generator. So we are guaranteed to only run it once.

else:
tx_removals, tx_additions = await self.get_tx_removals_and_additions(fetched_full_block, None)
if fetched_full_block.is_transaction_block():
assert fetched_full_block.foliage_transaction_block is not None
added_rec = await self.coin_store.new_block(
fetched_full_block.height,
fetched_full_block.foliage_transaction_block.timestamp,
fetched_full_block.get_included_reward_coins(),
tx_additions,
tx_removals,
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(
fetched_full_block, None
)
removed_rec: List[Optional[CoinRecord]] = [
await self.coin_store.get_coin_record(name) for name in tx_removals
]

# Set additions first, than removals in order to handle ephemeral coin state
# Add in height order is also required
record: Optional[CoinRecord]
for record in added_rec:
assert record
lastest_coin_state[record.name] = record
for record in removed_rec:
assert record
lastest_coin_state[record.name] = record

assert fetched_full_block.foliage_transaction_block is not None
added_rec = await self.coin_store.new_block(
fetched_full_block.height,
fetched_full_block.foliage_transaction_block.timestamp,
fetched_full_block.get_included_reward_coins(),
tx_additions,
tx_removals,
)
removed_rec: List[Optional[CoinRecord]] = [
await self.coin_store.get_coin_record(name) for name in tx_removals
]

# Set additions first, then removals in order to handle ephemeral coin state
# Add in height order is also required
record: Optional[CoinRecord]
for record in added_rec:
assert record
lastest_coin_state[record.name] = record
for record in removed_rec:
assert record
lastest_coin_state[record.name] = record

if npc_res is not None:
hint_list: List[Tuple[bytes32, bytes]] = self.get_hint_list(npc_res)
await self.hint_store.add_hints(hint_list)
# There can be multiple coins for the same hint
for coin_id, hint in hint_list:
key = hint
if key not in hint_coin_state:
hint_coin_state[key] = {}
hint_coin_state[key][coin_id] = lastest_coin_state[coin_id]

# Changes the peak to be the new peak
await self.block_store.set_peak(block_record.header_hash)
return uint32(max(fork_height, 0)), block_record.height, records_to_add, list(lastest_coin_state.values())
return (
uint32(max(fork_height, 0)),
block_record.height,
records_to_add,
(list(lastest_coin_state.values()), hint_coin_state),
)

# This is not a heavier block than the heaviest we have seen, so we don't change the coin set
return None, None, [], list(lastest_coin_state.values())
return None, None, [], ([], {})

async def get_tx_removals_and_additions(
self, block: FullBlock, npc_result: Optional[NPCResult] = None
) -> Tuple[List[bytes32], List[Coin]]:
) -> Tuple[List[bytes32], List[Coin], Optional[NPCResult]]:
if block.is_transaction_block():
if block.transactions_generator is not None:
if npc_result is None:
Expand All @@ -424,11 +465,11 @@ async def get_tx_removals_and_additions(
safe_mode=False,
)
tx_removals, tx_additions = tx_removals_and_additions(npc_result.npc_list)
return tx_removals, tx_additions
return tx_removals, tx_additions, npc_result
else:
return [], []
return [], [], None
else:
return [], []
return [], [], None

def get_next_difficulty(self, header_hash: bytes32, new_slot: bool) -> uint64:
assert self.contains_block(header_hash)
Expand Down
19 changes: 9 additions & 10 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ async def get_coin_records_by_names(

return list(coins)

def row_to_coin_state(self, row):
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
spent_h = None
if row[3]:
spent_h = row[2]
return CoinState(coin, spent_h, row[1])

async def get_coin_states_by_puzzle_hashes(
self,
include_spent_coins: bool,
Expand All @@ -265,11 +272,7 @@ async def get_coin_states_by_puzzle_hashes(

await cursor.close()
for row in rows:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
spent_h = None
if row[3]:
spent_h = row[2]
coins.add(CoinState(coin, spent_h, row[1]))
coins.add(self.row_to_coin_state(row))

return list(coins)

Expand Down Expand Up @@ -323,11 +326,7 @@ async def get_coin_state_by_ids(

await cursor.close()
for row in rows:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
spent_h = None
if row[3]:
spent_h = row[2]
coins.add(CoinState(coin, spent_h, row[1]))
coins.add(self.row_to_coin_state(row))
return list(coins)

async def rollback_to_block(self, block_index: int) -> List[CoinRecord]:
Expand Down
Loading