diff --git a/chia/util/keychain.py b/chia/util/keychain.py index de97acdbe19e..c3f8978e9bc4 100644 --- a/chia/util/keychain.py +++ b/chia/util/keychain.py @@ -135,7 +135,7 @@ def _get_service(self): def _get_pk_and_entropy(self, user: str) -> Optional[Tuple[G1Element, bytes]]: """ - Returns the keychain conntents for a specific 'user' (key index). The contents + Returns the keychain contents for a specific 'user' (key index). The contents include an G1Element and the entropy required to generate the private key. Note that generating the actual private key also requires the passphrase. """ diff --git a/tests/clvm/test_puzzles.py b/tests/clvm/test_puzzles.py index fc116f0ae803..3aa4283926fc 100644 --- a/tests/clvm/test_puzzles.py +++ b/tests/clvm/test_puzzles.py @@ -18,7 +18,6 @@ p2_puzzle_hash, ) from tests.util.key_tool import KeyTool -from chia.util.block_tools import test_constants from ..core.make_block_generator import int_to_public_key from .coin_store import CoinStore, CoinTimestamp @@ -26,6 +25,8 @@ T1 = CoinTimestamp(1, 10000000) T2 = CoinTimestamp(5, 10003000) +MAX_BLOCK_COST_CLVM = int(1e18) + def secret_exponent_for_index(index: int) -> int: blob = index.to_bytes(32, "big") @@ -71,7 +72,7 @@ def do_test_spend( coin_solution = CoinSolution(coin, puzzle_reveal, solution) spend_bundle = SpendBundle([coin_solution], G2Element()) - coin_db.update_coin_store_for_spend_bundle(spend_bundle, spend_time, test_constants.MAX_BLOCK_COST_CLVM) + coin_db.update_coin_store_for_spend_bundle(spend_bundle, spend_time, MAX_BLOCK_COST_CLVM) # ensure all outputs are there for puzzle_hash, amount in payments: diff --git a/tests/core/full_node/ram_db.py b/tests/core/full_node/ram_db.py new file mode 100644 index 000000000000..d71435649df6 --- /dev/null +++ b/tests/core/full_node/ram_db.py @@ -0,0 +1,18 @@ +from typing import Tuple + +import aiosqlite + +from chia.consensus.blockchain import Blockchain +from chia.consensus.constants import ConsensusConstants +from chia.full_node.block_store import BlockStore +from chia.full_node.coin_store import CoinStore +from chia.util.db_wrapper import DBWrapper + + +async def create_ram_blockchain(consensus_constants: ConsensusConstants) -> Tuple[aiosqlite.Connection, Blockchain]: + connection = await aiosqlite.connect(":memory:") + db_wrapper = DBWrapper(connection) + block_store = await BlockStore.create(db_wrapper) + coin_store = await CoinStore.create(db_wrapper) + blockchain = await Blockchain.create(coin_store, block_store, consensus_constants) + return connection, blockchain diff --git a/tests/core/full_node/test_conditions.py b/tests/core/full_node/test_conditions.py new file mode 100644 index 000000000000..5cd784181515 --- /dev/null +++ b/tests/core/full_node/test_conditions.py @@ -0,0 +1,208 @@ +""" +These are quick-to-run test that check spends can be added to the blockchain when they're valid +or that they're failing for the right reason when they're invalid. +""" + +import logging +import time + +from typing import List, Optional + +import pytest + +from blspy import G2Element + +from clvm_tools.binutils import assemble + +from chia.consensus.blockchain import ReceiveBlockResult +from chia.consensus.constants import ConsensusConstants +from chia.types.announcement import Announcement +from chia.types.blockchain_format.program import Program +from chia.types.coin_solution import CoinSolution +from chia.types.condition_opcodes import ConditionOpcode +from chia.types.full_block import FullBlock +from chia.types.spend_bundle import SpendBundle +from chia.util.block_tools import BlockTools, test_constants +from chia.util.errors import Err + +from .ram_db import create_ram_blockchain + + +bt = BlockTools(constants=test_constants) + + +log = logging.getLogger(__name__) + + +# This puzzle simply returns the solution as conditions. +# We call it the `EASY_PUZZLE` because it's pretty easy to solve. + +EASY_PUZZLE = Program.to(assemble("1")) +EASY_PUZZLE_HASH = EASY_PUZZLE.get_tree_hash() + + +def initial_blocks(block_count: int = 4) -> List[FullBlock]: + blocks = bt.get_consecutive_blocks( + block_count, + guarantee_transaction_block=True, + farmer_reward_puzzle_hash=EASY_PUZZLE_HASH, + pool_reward_puzzle_hash=EASY_PUZZLE_HASH, + ) + return blocks + + +async def check_spend_bundle_validity( + constants: ConsensusConstants, + blocks: List[FullBlock], + spend_bundle: SpendBundle, + expected_err: Optional[Err] = None, +): + """ + This test helper create an extra block after the given blocks that contains the given + `SpendBundle`, and then invokes `receive_block` to ensure that it's accepted (if `expected_err=None`) + or fails with the correct error code. + """ + try: + connection, blockchain = await create_ram_blockchain(constants) + for block in blocks: + received_block_result, err, fork_height = await blockchain.receive_block(block) + assert err is None + + additional_blocks = bt.get_consecutive_blocks( + 1, + block_list_input=blocks, + guarantee_transaction_block=True, + transaction_data=spend_bundle, + ) + newest_block = additional_blocks[-1] + + received_block_result, err, fork_height = await blockchain.receive_block(newest_block) + + if expected_err is None: + assert err is None + assert received_block_result == ReceiveBlockResult.NEW_PEAK + assert fork_height == len(blocks) - 1 + else: + assert err == expected_err + assert received_block_result == ReceiveBlockResult.INVALID_BLOCK + assert fork_height is None + finally: + # if we don't close the connection, the test process doesn't exit cleanly + await connection.close() + + # we must call `shut_down` or the executor in `Blockchain` doesn't stop + blockchain.shut_down() + + +async def check_conditions( + condition_solution: Program, expected_err: Optional[Err] = None, spend_reward_index: int = -2 +): + blocks = initial_blocks() + coin = list(blocks[spend_reward_index].get_included_reward_coins())[0] + + coin_solution = CoinSolution(coin, EASY_PUZZLE, condition_solution) + spend_bundle = SpendBundle([coin_solution], G2Element()) + + # now let's try to create a block with the spend bundle and ensure that it doesn't validate + + await check_spend_bundle_validity(bt.constants, blocks, spend_bundle, expected_err=expected_err) + + +class TestConditions: + @pytest.mark.asyncio + async def test_invalid_block_age(self): + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_RELATIVE[0]} 2))")) + await check_conditions(conditions, expected_err=Err.ASSERT_HEIGHT_RELATIVE_FAILED) + + @pytest.mark.asyncio + async def test_valid_block_age(self): + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_RELATIVE[0]} 1))")) + await check_conditions(conditions) + + @pytest.mark.asyncio + async def test_invalid_block_height(self): + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE[0]} 4))")) + await check_conditions(conditions, expected_err=Err.ASSERT_HEIGHT_ABSOLUTE_FAILED) + + @pytest.mark.asyncio + async def test_valid_block_height(self): + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE[0]} 3))")) + await check_conditions(conditions) + + @pytest.mark.asyncio + async def test_invalid_my_id(self): + blocks = initial_blocks() + coin = list(blocks[-2].get_included_reward_coins())[0] + wrong_name = bytearray(coin.name()) + wrong_name[-1] ^= 1 + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_MY_COIN_ID[0]} 0x{wrong_name.hex()}))")) + await check_conditions(conditions, expected_err=Err.ASSERT_MY_COIN_ID_FAILED) + + @pytest.mark.asyncio + async def test_valid_my_id(self): + blocks = initial_blocks() + coin = list(blocks[-2].get_included_reward_coins())[0] + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_MY_COIN_ID[0]} 0x{coin.name().hex()}))")) + await check_conditions(conditions) + + @pytest.mark.asyncio + async def test_invalid_seconds_absolute(self): + # TODO: make the test suite not use `time.time` so we can more accurately + # set `time_now` to make it minimal while still failing + time_now = int(time.time()) + 3000 + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_SECONDS_ABSOLUTE[0]} {time_now}))")) + await check_conditions(conditions, expected_err=Err.ASSERT_SECONDS_ABSOLUTE_FAILED) + + @pytest.mark.asyncio + async def test_valid_seconds_absolute(self): + time_now = int(time.time()) + conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_SECONDS_ABSOLUTE[0]} {time_now}))")) + await check_conditions(conditions) + + @pytest.mark.asyncio + async def test_invalid_coin_announcement(self): + blocks = initial_blocks() + coin = list(blocks[-2].get_included_reward_coins())[0] + announce = Announcement(coin.name(), b"test_bad") + conditions = Program.to( + assemble( + f"(({ConditionOpcode.CREATE_COIN_ANNOUNCEMENT[0]} 'test')" + f"({ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))" + ) + ) + await check_conditions(conditions, expected_err=Err.ASSERT_ANNOUNCE_CONSUMED_FAILED) + + @pytest.mark.asyncio + async def test_valid_coin_announcement(self): + blocks = initial_blocks() + coin = list(blocks[-2].get_included_reward_coins())[0] + announce = Announcement(coin.name(), b"test") + conditions = Program.to( + assemble( + f"(({ConditionOpcode.CREATE_COIN_ANNOUNCEMENT[0]} 'test')" + f"({ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))" + ) + ) + await check_conditions(conditions) + + @pytest.mark.asyncio + async def test_invalid_puzzle_announcement(self): + announce = Announcement(EASY_PUZZLE_HASH, b"test_bad") + conditions = Program.to( + assemble( + f"(({ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT[0]} 'test')" + f"({ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))" + ) + ) + await check_conditions(conditions, expected_err=Err.ASSERT_ANNOUNCE_CONSUMED_FAILED) + + @pytest.mark.asyncio + async def test_valid_puzzle_announcement(self): + announce = Announcement(EASY_PUZZLE_HASH, b"test") + conditions = Program.to( + assemble( + f"(({ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT[0]} 'test')" + f"({ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))" + ) + ) + await check_conditions(conditions)