diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index 5e729a1ef..bd153ecf3 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -66,6 +66,7 @@ defmodule Archethic.Replication.TransactionContext do genesis_address |> TransactionChain.fetch(storage_nodes, paging_state: paging_address) |> Stream.take_while(&(Transaction.previous_address(&1) != limit_address)) + |> ensure_all_tx_fetched(paging_address, genesis_address, limit_address) end end @@ -84,4 +85,32 @@ defmodule Archethic.Replication.TransactionContext do TransactionChain.fetch_unspent_outputs(genesis_address, genesis_nodes) |> Enum.to_list() end + + defp ensure_all_tx_fetched(transactions, paging_address, genesis_address, limit_address) do + paging_address = if paging_address == nil, do: genesis_address, else: paging_address + + Stream.transform( + transactions, + # init accumulator + fn -> paging_address end, + # loop over transactions + fn tx = %Transaction{address: address}, expected_previous_address -> + previous_address = Transaction.previous_address(tx) + + if previous_address != expected_previous_address do + raise( + "Replication failed to fetch previous chain after #{Base.encode16(expected_previous_address)}" + ) + end + + {[tx], address} + end, + # after all tx processed + fn last_address -> + if last_address != limit_address do + raise "Replication failed to fetch previous chain after #{Base.encode16(last_address)}" + end + end + ) + end end diff --git a/test/archethic/replication/transaction_context_test.exs b/test/archethic/replication/transaction_context_test.exs index 8a650c619..3cf700d7f 100644 --- a/test/archethic/replication/transaction_context_test.exs +++ b/test/archethic/replication/transaction_context_test.exs @@ -18,7 +18,8 @@ defmodule Archethic.Replication.TransactionContextTest do alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput - import ArchethicCase + alias Archethic.TransactionFactory + import Mox test "fetch_transaction/1 should retrieve the transaction" do @@ -42,47 +43,136 @@ defmodule Archethic.Replication.TransactionContextTest do assert %Transaction{} = TransactionContext.fetch_transaction("@Alice1") end - test "stream_transaction_chain/1 should retrieve the previous transaction chain" do - pub1 = random_public_key() - pub2 = random_public_key() - - genesis = random_address() - addr1 = Crypto.derive_address(pub1) - addr2 = Crypto.derive_address(pub2) - - MockDB - |> expect(:get_last_chain_address_stored, fn ^genesis -> addr1 end) - - MockClient - |> expect(:send_message, fn - _, %GetTransactionChain{address: ^genesis, paging_state: ^addr1}, _ -> - {:ok, - %TransactionList{ - transactions: [ - %Transaction{previous_public_key: pub1}, - %Transaction{previous_public_key: pub2} - ] - }} - end) - - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: Crypto.last_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - }) - - chain = - genesis - |> TransactionContext.stream_transaction_chain(addr2, P2P.authorized_and_available_nodes()) - |> Enum.to_list() - - assert [%Transaction{previous_public_key: ^pub1}] = chain + describe "stream_transaction_chain/3" do + setup do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.last_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now() + }) + + seed = "seed" + + tx1 = TransactionFactory.create_valid_transaction([], seed: seed, index: 0) + tx2 = TransactionFactory.create_valid_transaction([], seed: seed, index: 1) + tx3 = TransactionFactory.create_valid_transaction([], seed: seed, index: 2) + tx4 = TransactionFactory.create_valid_transaction([], seed: seed, index: 3) + + %{transactions: [tx1, tx2, tx3, tx4]} + end + + test "should retrieve the previous transaction chain with part of chain already stored", %{ + transactions: [tx1, tx2, tx3, tx4] + } do + genesis = Transaction.previous_address(tx1) + addr1 = tx1.address + addr3 = tx3.address + + MockDB + |> expect(:get_last_chain_address_stored, fn ^genesis -> addr1 end) + + MockClient + |> expect(:send_message, fn + _, %GetTransactionChain{address: ^genesis, paging_state: ^addr1}, _ -> + {:ok, %TransactionList{transactions: [tx2, tx3, tx4]}} + end) + + nodes = P2P.authorized_and_available_nodes() + + assert [tx2, tx3] == + genesis + |> TransactionContext.stream_transaction_chain(addr3, nodes) + |> Enum.to_list() + end + + test "should retrieve the previous transactions with no tx of the chain already stored", %{ + transactions: [tx1, tx2, tx3, tx4] + } do + genesis = Transaction.previous_address(tx1) + addr3 = tx3.address + + MockDB + |> expect(:get_last_chain_address_stored, fn ^genesis -> nil end) + + MockClient + |> expect(:send_message, fn + _, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ -> + {:ok, %TransactionList{transactions: [tx1, tx2, tx3, tx4]}} + end) + + nodes = P2P.authorized_and_available_nodes() + + assert [tx1, tx2, tx3] == + genesis + |> TransactionContext.stream_transaction_chain(addr3, nodes) + |> Enum.to_list() + end + + test "should raise en error if part of the chain is not fetched", %{ + transactions: [tx1, tx2, tx3, tx4] + } do + genesis = Transaction.previous_address(tx1) + addr3 = tx3.address + + MockDB + |> stub(:get_last_chain_address_stored, fn ^genesis -> nil end) + + nodes = P2P.authorized_and_available_nodes() + + # Missing first transaction + MockClient + |> expect(:send_message, fn + _, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ -> + {:ok, %TransactionList{transactions: [tx2, tx3, tx4]}} + end) + + expected_message = + "Replication failed to fetch previous chain after #{Base.encode16(genesis)}" + + assert_raise RuntimeError, expected_message, fn -> + genesis + |> TransactionContext.stream_transaction_chain(addr3, nodes) + |> Enum.to_list() + end + + # Missing middle transaction + MockClient + |> expect(:send_message, fn + _, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ -> + {:ok, %TransactionList{transactions: [tx1, tx3, tx4]}} + end) + + expected_message = + "Replication failed to fetch previous chain after #{Base.encode16(tx1.address)}" + + assert_raise RuntimeError, expected_message, fn -> + genesis + |> TransactionContext.stream_transaction_chain(addr3, nodes) + |> Enum.to_list() + end + + # Missing last transaction + MockClient + |> expect(:send_message, fn + _, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ -> + {:ok, %TransactionList{transactions: [tx1, tx2]}} + end) + + expected_message = + "Replication failed to fetch previous chain after #{Base.encode16(tx2.address)}" + + assert_raise RuntimeError, expected_message, fn -> + genesis + |> TransactionContext.stream_transaction_chain(addr3, nodes) + |> Enum.to_list() + end + end end test "fetch_transaction_unspent_outputs/1 should retrieve the utxos of the chain" do