diff --git a/lib/archethic/p2p/mem_table_loader.ex b/lib/archethic/p2p/mem_table_loader.ex index d63bb89fe..641d9c8c5 100644 --- a/lib/archethic/p2p/mem_table_loader.ex +++ b/lib/archethic/p2p/mem_table_loader.ex @@ -8,6 +8,7 @@ defmodule Archethic.P2P.MemTableLoader do alias Archethic.DB + alias Archethic.P2P alias Archethic.P2P.GeoPatch alias Archethic.P2P.MemTable alias Archethic.P2P.Node @@ -86,7 +87,7 @@ defmodule Archethic.P2P.MemTableLoader do end @doc """ - Load the transaction and update the P2P view + Load the transaction and update the P2P view. """ @spec load_transaction(Transaction.t()) :: :ok def load_transaction(%Transaction{ @@ -112,7 +113,8 @@ defmodule Archethic.P2P.MemTableLoader do reward_address: reward_address, origin_public_key: origin_public_key, mining_public_key: mining_public_key, - geo_patch: geo_patch + geo_patch: geo_patch, + geo_patch_update: geo_patch_update }} = Node.decode_transaction_content(content) geo_patch = if geo_patch == nil, do: GeoPatch.from_ip(ip), else: geo_patch @@ -133,13 +135,12 @@ defmodule Archethic.P2P.MemTableLoader do mining_public_key: mining_public_key } - node - |> Node.enroll(timestamp) - |> MemTable.add_node() + node = Node.enroll(node, timestamp) + MemTable.add_node(node) else {:ok, node} = MemTable.get_node(first_public_key) - MemTable.add_node(%{ + updated_node = %Node{ node | ip: ip, port: port, @@ -152,10 +153,14 @@ defmodule Archethic.P2P.MemTableLoader do origin_public_key: origin_public_key, last_update_date: timestamp, mining_public_key: mining_public_key - }) + } + + MemTable.add_node(updated_node) + + handle_geo_patch_update(node, updated_node, geo_patch_update) end - Logger.info("Node loaded into in memory p2p tables", node: Base.encode16(first_public_key)) + Logger.info("Node loaded into in-memory P2P tables", node: Base.encode16(first_public_key)) end def load_transaction(%Transaction{ @@ -201,4 +206,27 @@ defmodule Archethic.P2P.MemTableLoader do MemTable.set_node_unavailable(node_public_key, availability_update) end end + + defp handle_geo_patch_update( + %Node{first_public_key: first_public_key, geo_patch: prev_geo_patch}, + %Node{geo_patch: new_geo_patch}, + geo_patch_update + ) + when prev_geo_patch != new_geo_patch do + Logger.info("GeoPatch changed for node", node: Base.encode16(first_public_key)) + Logger.info("Starting Notifier for GeoPatch change") + + new_nodes = P2P.authorized_and_available_nodes(geo_patch_update) + + previous_nodes = + Enum.map(new_nodes, fn + node = %Node{first_public_key: ^first_public_key} -> + %Node{node | geo_patch: prev_geo_patch} + + node -> + node + end) + + SelfRepair.start_notifier(previous_nodes, new_nodes, geo_patch_update) + end end diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index c93da4c6a..5f15de8a3 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -16,7 +16,6 @@ defmodule Archethic.SelfRepair do alias Archethic.P2P.Node alias Archethic.Replication alias Archethic.TransactionChain - alias Archethic.Utils require Logger @@ -164,33 +163,21 @@ defmodule Archethic.SelfRepair do @doc """ Start a new notifier process if there is new unavailable nodes after the self repair """ - @spec start_notifier(list(Node.t()), list(Node.t()), DateTime.t()) :: :ok - def start_notifier(prev_available_nodes, new_available_nodes, availability_update) do - diff_node = - prev_available_nodes - |> Enum.reject( - &(Utils.key_in_node_list?(new_available_nodes, &1.first_public_key) or - &1.first_public_key == Crypto.first_node_public_key()) - ) - - case diff_node do - [] -> - :ok - - nodes -> - unavailable_nodes = Enum.map(nodes, & &1.first_public_key) + @spec start_notifier( + previous_nodes :: list(Node.t()), + new_nodes :: list(Node.t()), + availability_update :: DateTime.t() + ) :: :ok + def start_notifier(previous_nodes, new_nodes, availability_update) do + args = [ + previous_nodes: previous_nodes, + new_nodes: new_nodes, + availability_update: availability_update + ] - DynamicSupervisor.start_child( - NotifierSupervisor, - {Notifier, - unavailable_nodes: unavailable_nodes, - prev_available_nodes: prev_available_nodes, - new_available_nodes: new_available_nodes, - availability_update: availability_update} - ) + DynamicSupervisor.start_child(NotifierSupervisor, {Notifier, args}) - :ok - end + :ok end @doc """ diff --git a/lib/archethic/self_repair/notifier.ex b/lib/archethic/self_repair/notifier.ex index 3b20b3489..a51d649e9 100644 --- a/lib/archethic/self_repair/notifier.ex +++ b/lib/archethic/self_repair/notifier.ex @@ -18,22 +18,17 @@ defmodule Archethic.SelfRepair.Notifier do ``` """ - alias Archethic.{ - BeaconChain, - Crypto, - Election, - P2P, - P2P.Node, - P2P.Message.ShardRepair, - TransactionChain, - TransactionChain.Transaction, - Utils - } - - alias Archethic.TransactionChain.Transaction.{ - ValidationStamp, - ValidationStamp.LedgerOperations - } + alias Archethic.BeaconChain + alias Archethic.Crypto + alias Archethic.Election + alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message.ShardRepair + alias Archethic.TransactionChain + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations + alias Archethic.Utils use GenServer, restart: :temporary @vsn 1 @@ -59,16 +54,13 @@ defmodule Archethic.SelfRepair.Notifier do end def handle_info(:start, data) do - unavailable_nodes = Keyword.fetch!(data, :unavailable_nodes) - prev_available_nodes = Keyword.fetch!(data, :prev_available_nodes) - new_available_nodes = Keyword.fetch!(data, :new_available_nodes) + previous_nodes = Keyword.fetch!(data, :previous_nodes) + new_nodes = Keyword.fetch!(data, :new_nodes) - Logger.info( - "Start Notifier due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}" - ) + Logger.info("Start Notifier due to a topology change") - repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) - repair_summaries_aggregate(prev_available_nodes, new_available_nodes) + repair_transactions(previous_nodes, new_nodes) + repair_summaries_aggregate(previous_nodes, new_nodes) {:stop, :normal, data} end @@ -77,20 +69,13 @@ defmodule Archethic.SelfRepair.Notifier do For each txn chain in db. Load its genesis address, load its chain, recompute shards , notifiy nodes. Network txns are excluded. """ - @spec repair_transactions(list(Crypto.key()), list(Node.t()), list(Node.t())) :: :ok - def repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) do + @spec repair_transactions(list(Node.t()), list(Node.t())) :: :ok + def repair_transactions(previous_nodes, new_nodes) do # We fetch all the transactions existing and check if the disconnected nodes were in storage nodes TransactionChain.list_first_addresses() |> Stream.reject(&network_chain?(&1)) |> Stream.chunk_every(20) - |> Stream.each(fn chunk -> - concurrent_txn_processing( - chunk, - unavailable_nodes, - prev_available_nodes, - new_available_nodes - ) - end) + |> Stream.each(&concurrent_txn_processing(&1, previous_nodes, new_nodes)) |> Stream.run() end @@ -101,23 +86,18 @@ defmodule Archethic.SelfRepair.Notifier do end end - defp concurrent_txn_processing( - addresses, - unavailable_nodes, - prev_available_nodes, - new_available_nodes - ) do + defp concurrent_txn_processing(addresses, previous_nodes, new_nodes) do Task.Supervisor.async_stream_nolink( Archethic.task_supervisors(), addresses, - &sync_chain(&1, unavailable_nodes, prev_available_nodes, new_available_nodes), + &sync_chain(&1, previous_nodes, new_nodes), ordered: false, on_timeout: :kill_task ) |> Stream.run() end - defp sync_chain(address, unavailable_nodes, prev_available_nodes, new_available_nodes) do + defp sync_chain(address, previous_nodes, new_nodes) do genesis_address = TransactionChain.get_genesis_address(address) address @@ -125,15 +105,15 @@ defmodule Archethic.SelfRepair.Notifier do :address, validation_stamp: [ledger_operations: [:transaction_movements]] ]) - |> Stream.map(&get_previous_election(&1, prev_available_nodes, genesis_address)) - |> Stream.filter(&storage_or_io_node?(&1, unavailable_nodes)) + |> Stream.map(&compute_elections(&1, previous_nodes, new_nodes, genesis_address)) + |> Stream.filter(&election_changed?(&1)) |> Stream.filter(¬ify?(&1)) - |> Stream.map(&new_storage_nodes(&1, new_available_nodes)) + |> Stream.map(&filter_nodes_to_notify(&1)) |> map_last_addresses_for_node() |> notify_nodes(genesis_address) end - defp get_previous_election( + defp compute_elections( %Transaction{ address: address, validation_stamp: %ValidationStamp{ @@ -142,64 +122,77 @@ defmodule Archethic.SelfRepair.Notifier do recipients: recipients } }, - prev_available_nodes, + previous_nodes, + new_nodes, genesis_address ) do - prev_storage_nodes = - address - |> Election.chain_storage_nodes(prev_available_nodes) - |> Enum.map(& &1.first_public_key) - - movements_addresses = - transaction_movements - |> Enum.map(& &1.to) - |> Enum.concat(recipients) - - authorized_nodes = P2P.authorized_and_available_nodes() + movements_addresses = transaction_movements |> Enum.map(& &1.to) |> Enum.concat(recipients) # Before AEIP-21, resolve movements included only last addresses, # then we have to resolve the genesis address for all the movements - resolved_addresses = - if protocol_version <= 7 do - Task.async_stream( - movements_addresses, - fn address -> - storage_nodes = Election.chain_storage_nodes(address, authorized_nodes) - - {:ok, resolved_genesis_address} = - TransactionChain.fetch_genesis_address( - address, - storage_nodes - ) - - [address, resolved_genesis_address] - end, - on_timeout: :kill_task, - max_concurrency: max(System.schedulers_online(), length(movements_addresses)) - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.flat_map(fn {:ok, addresses} -> addresses end) - |> Enum.concat([genesis_address]) - else - [genesis_address | movements_addresses] - end + resolved_addresses = compute_resolved_addresses(movements_addresses, protocol_version) + + prev_storage_nodes = + address |> Election.chain_storage_nodes(previous_nodes) |> Enum.map(& &1.first_public_key) prev_io_nodes = - resolved_addresses - |> Election.io_storage_nodes(prev_available_nodes) + [genesis_address | resolved_addresses] + |> Election.io_storage_nodes(previous_nodes) |> Enum.map(& &1.first_public_key) - {address, resolved_addresses, prev_storage_nodes, prev_io_nodes -- prev_storage_nodes} + new_storage_nodes = + Election.chain_storage_nodes(address, new_nodes) |> Enum.map(& &1.first_public_key) + + new_io_nodes = + [genesis_address | resolved_addresses] + |> Election.io_storage_nodes(new_nodes) + |> Enum.map(& &1.first_public_key) + + %{ + address: address, + prev_storage_nodes: prev_storage_nodes, + prev_io_nodes: prev_io_nodes, + new_storage_nodes: new_storage_nodes, + new_io_nodes: new_io_nodes + } end - defp storage_or_io_node?({_, _, prev_storage_nodes, prev_io_nodes}, unavailable_nodes) do - nodes = prev_storage_nodes ++ prev_io_nodes - Enum.any?(unavailable_nodes, &Enum.member?(nodes, &1)) + defp election_changed?(%{ + prev_storage_nodes: prev_storage_nodes, + prev_io_nodes: prev_io_nodes, + new_storage_nodes: new_storage_nodes, + new_io_nodes: new_io_nodes + }) do + prev_storage_nodes != new_storage_nodes or prev_io_nodes != new_io_nodes end + defp compute_resolved_addresses(movements_addresses, protocol_version) + when protocol_version <= 7 do + authorized_nodes = P2P.authorized_and_available_nodes() + + Task.async_stream( + movements_addresses, + fn address -> + storage_nodes = Election.chain_storage_nodes(address, authorized_nodes) + + {:ok, resolved_genesis_address} = + TransactionChain.fetch_genesis_address(address, storage_nodes) + + resolved_genesis_address + end, + on_timeout: :kill_task, + max_concurrency: max(System.schedulers_online(), length(movements_addresses)) + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.map(fn {:ok, address} -> address end) + |> Enum.uniq() + end + + defp compute_resolved_addresses(movements_addresses, _protocol_version), do: movements_addresses + # Notify only if the current node is part of the previous storage / io nodes # to reduce number of messages - defp notify?({_, _, prev_storage_nodes, prev_io_nodes}) do + defp notify?(%{prev_io_nodes: prev_io_nodes, prev_storage_nodes: prev_storage_nodes}) do Enum.member?(prev_storage_nodes ++ prev_io_nodes, Crypto.first_node_public_key()) end @@ -207,27 +200,19 @@ defmodule Archethic.SelfRepair.Notifier do New election is carried out on the set of all authorized omiting unavailable_node. The set of previous storage nodes is subtracted from the set of new storage nodes. """ - @spec new_storage_nodes( - {binary(), list(Crypto.prepended_hash()), list(Crypto.key()), list(Crypto.key())}, - list(Node.t()) - ) :: - {binary(), list(Crypto.key()), list(Crypto.key())} - def new_storage_nodes( - {address, resolved_addresses, prev_storage_nodes, prev_io_nodes}, - new_available_nodes - ) do - new_storage_nodes = - Election.chain_storage_nodes(address, new_available_nodes) - |> Enum.map(& &1.first_public_key) - |> Enum.reject(&Enum.member?(prev_storage_nodes, &1)) + @spec filter_nodes_to_notify(map()) :: {binary(), list(Crypto.key()), list(Crypto.key())} + def filter_nodes_to_notify(%{ + address: address, + new_io_nodes: new_io_nodes, + new_storage_nodes: new_storage_nodes, + prev_io_nodes: prev_io_nodes, + prev_storage_nodes: prev_storage_nodes + }) do + new_storage_nodes = new_storage_nodes -- prev_storage_nodes - already_stored_nodes = prev_storage_nodes ++ prev_io_nodes ++ new_storage_nodes + already_stored_nodes = Enum.uniq(prev_storage_nodes ++ prev_io_nodes ++ new_storage_nodes) - new_io_nodes = - resolved_addresses - |> Election.io_storage_nodes(new_available_nodes) - |> Enum.map(& &1.first_public_key) - |> Enum.reject(&Enum.member?(already_stored_nodes, &1)) + new_io_nodes = new_io_nodes -- already_stored_nodes {address, new_storage_nodes, new_io_nodes} end @@ -291,18 +276,18 @@ defmodule Archethic.SelfRepair.Notifier do For each beacon aggregate, calculate the new election and store it if the node needs to """ @spec repair_summaries_aggregate(list(Node.t()), list(Node.t())) :: :ok - def repair_summaries_aggregate(prev_available_nodes, new_available_nodes) do + def repair_summaries_aggregate(previous_nodes, new_nodes) do %Node{enrollment_date: first_enrollment_date} = P2P.get_first_enrolled_node() first_enrollment_date |> BeaconChain.next_summary_dates() - |> Stream.filter(&download?(&1, new_available_nodes)) + |> Stream.filter(&download?(&1, new_nodes)) |> Stream.chunk_every(20) |> Stream.each(fn summary_times -> Task.Supervisor.async_stream_nolink( Archethic.task_supervisors(), summary_times, - &download_and_store_summary(&1, prev_available_nodes), + &download_and_store_summary(&1, previous_nodes), ordered: false, on_timeout: :kill_task ) @@ -311,11 +296,11 @@ defmodule Archethic.SelfRepair.Notifier do |> Stream.run() end - defp download?(summary_time, new_available_nodes) do + defp download?(summary_time, new_nodes) do in_new_election? = summary_time |> Crypto.derive_beacon_aggregate_address() - |> Election.chain_storage_nodes(new_available_nodes) + |> Election.chain_storage_nodes(new_nodes) |> Utils.key_in_node_list?(Crypto.first_node_public_key()) if in_new_election? do @@ -328,11 +313,11 @@ defmodule Archethic.SelfRepair.Notifier do end end - defp download_and_store_summary(summary_time, prev_available_nodes) do + defp download_and_store_summary(summary_time, previous_nodes) do storage_nodes = summary_time |> Crypto.derive_beacon_aggregate_address() - |> Election.chain_storage_nodes(prev_available_nodes) + |> Election.chain_storage_nodes(previous_nodes) case BeaconChain.fetch_summaries_aggregate(summary_time, storage_nodes) do {:ok, aggregate} -> diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 66d2dbfd0..e98518123 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -658,9 +658,9 @@ defmodule Archethic.SelfRepair.Sync do defp store_aggregate( aggregate = %SummaryAggregate{summary_time: summary_time}, - new_available_nodes + new_nodes ) do - node_list = [P2P.get_node_info() | new_available_nodes] |> P2P.distinct_nodes() + node_list = [P2P.get_node_info() | new_nodes] |> P2P.distinct_nodes() should_store? = summary_time diff --git a/test/archethic/self_repair/notifier_test.exs b/test/archethic/self_repair/notifier_test.exs index 8e4536005..391857aed 100644 --- a/test/archethic/self_repair/notifier_test.exs +++ b/test/archethic/self_repair/notifier_test.exs @@ -25,7 +25,7 @@ defmodule Archethic.SelfRepair.NotifierTest do alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - test "new_storage_nodes/2 should return new election" do + test "filter_nodes_to_notify/1 should return new nodes in election" do node1 = %Node{ first_public_key: "node1", last_public_key: "node1", @@ -68,13 +68,20 @@ defmodule Archethic.SelfRepair.NotifierTest do P2P.add_and_connect_node(node4) prev_storage_nodes = ["node2", "node3"] - new_available_nodes = [node1, node2, node3, node4] - - assert {"Alice1", ["node4", "node1"], []} = - Notifier.new_storage_nodes( - {"Alice1", [], prev_storage_nodes, []}, - new_available_nodes - ) + prev_io_nodes = [] + new_storage_nodes = ["node4", "node1"] + new_io_nodes = [] + + result = + Notifier.filter_nodes_to_notify(%{ + address: "Alice1", + new_io_nodes: new_io_nodes, + new_storage_nodes: new_storage_nodes, + prev_io_nodes: prev_io_nodes, + prev_storage_nodes: prev_storage_nodes + }) + + assert {"Alice1", ["node4", "node1"], []} = result end test "map_last_address_for_node/1 should create a map with last address for each node" do @@ -96,7 +103,7 @@ defmodule Archethic.SelfRepair.NotifierTest do assert ^expected = Notifier.map_last_addresses_for_node(tab) end - test "repair_transactions/3 should send message to new storage nodes" do + test "repair_transactions/2 should send message to new storage nodes" do node = %Node{ first_public_key: Crypto.first_node_public_key(), last_public_key: Crypto.last_node_public_key(), @@ -108,7 +115,7 @@ defmodule Archethic.SelfRepair.NotifierTest do P2P.add_and_connect_node(node) - prev_available_nodes = + previous_nodes = Enum.map(1..50, fn nb -> node = %Node{ first_public_key: "node#{nb}", @@ -124,21 +131,20 @@ defmodule Archethic.SelfRepair.NotifierTest do node end) - prev_available_nodes = [node | prev_available_nodes] + previous_nodes = [node | previous_nodes] # Take nodes in election of Alice2 but not in the one of Alice3 - elec1 = Election.chain_storage_nodes("Alice2", prev_available_nodes) - elec2 = Election.chain_storage_nodes("Alice3", prev_available_nodes) + elec1 = Election.chain_storage_nodes("Alice2", previous_nodes) + elec2 = Election.chain_storage_nodes("Alice3", previous_nodes) diff_nodes = elec1 -- elec2 unavailable_nodes = Enum.take(diff_nodes, 2) |> Enum.map(& &1.first_public_key) - new_available_nodes = - Enum.reject(prev_available_nodes, &(&1.first_public_key in unavailable_nodes)) + new_nodes = Enum.reject(previous_nodes, &(&1.first_public_key in unavailable_nodes)) # New possible storage nodes for Alice2 - new_possible_nodes = (prev_available_nodes -- elec1) |> Enum.map(& &1.first_public_key) + new_possible_nodes = (previous_nodes -- elec1) |> Enum.map(& &1.first_public_key) MockDB |> stub(:list_first_addresses, fn -> ["Alice1"] end) @@ -179,7 +185,7 @@ defmodule Archethic.SelfRepair.NotifierTest do {:ok, %Ok{}} end) - Notifier.repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) + Notifier.repair_transactions(previous_nodes, new_nodes) # Expect to receive only 1 new node for Alice2 assert_receive :new_node diff --git a/test/archethic/self_repair/sync_test.exs b/test/archethic/self_repair/sync_test.exs index a9980e229..4943d9fae 100644 --- a/test/archethic/self_repair/sync_test.exs +++ b/test/archethic/self_repair/sync_test.exs @@ -41,7 +41,6 @@ defmodule Archethic.SelfRepair.SyncTest do import Mox import Mock - import ArchethicCase describe "last_sync_date/0" do test "should get nil if not last sync file and not prior nodes" do @@ -590,82 +589,81 @@ defmodule Archethic.SelfRepair.SyncTest do assert %Node{network_patch: "AAA"} = P2P.get_node_info() end - describe "process_replication_attestations/2" do - test "should replicate the transactions and their inputs" do - {pub, priv} = Crypto.generate_random_keypair() + test "should replicate the transactions and their inputs" do + fixed_time = ~U[2024-12-04 16:00:00Z] - node1 = %Node{ - first_public_key: pub, - last_public_key: pub, - available?: true, - geo_patch: "BBB", - network_patch: "BBB", - authorized?: true, - reward_address: random_address(), - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - enrollment_date: DateTime.utc_now() - } + {pub, priv} = Crypto.generate_random_keypair() - node2 = %Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.first_node_public_key(), - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - reward_address: random_address(), - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - enrollment_date: DateTime.utc_now() - } + node1 = %Node{ + first_public_key: pub, + last_public_key: pub, + available?: true, + geo_patch: "BBB", + network_patch: "BBB", + authorized?: true, + reward_address: "reward_address_1", + authorization_date: fixed_time |> DateTime.add(-10), + enrollment_date: fixed_time + } - P2P.add_and_connect_node(node1) - P2P.add_and_connect_node(node2) + node2 = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + reward_address: "reward_address_2", + authorization_date: fixed_time |> DateTime.add(-10), + enrollment_date: fixed_time + } - tx = TransactionFactory.create_valid_transaction() - tx_address = tx.address - tx_summary = TransactionSummary.from_transaction(tx, Transaction.previous_address(tx)) + P2P.add_and_connect_node(node1) + P2P.add_and_connect_node(node2) + + tx = TransactionFactory.create_valid_transaction() + tx_address = tx.address + tx_summary = TransactionSummary.from_transaction(tx, Transaction.previous_address(tx)) + + transaction_inputs = %TransactionInputList{ + inputs: [ + %VersionedTransactionInput{ + protocol_version: 8, + input: %TransactionInput{ + from: "input_address", + amount: 100_000_000, + type: :UCO, + timestamp: fixed_time + } + } + ], + more?: false, + offset: 0 + } - MockClient - |> expect(:send_message, fn ^node1, %GetTransaction{}, _ -> - {:ok, tx} - end) - |> expect(:send_message, fn ^node1, %GetTransactionInputs{}, _ -> - {:ok, - %TransactionInputList{ - inputs: [ - %VersionedTransactionInput{ - protocol_version: 8, - input: %TransactionInput{ - from: random_address(), - amount: 100_000_000, - type: :UCO, - timestamp: DateTime.utc_now() - } - } - ], - more?: false, - offset: 0 - }} - end) + MockClient + |> stub(:send_message, fn + ^node1, %GetTransaction{}, _ -> {:ok, tx} + ^node1, %GetTransactionInputs{}, _ -> {:ok, transaction_inputs} + end) - MockTransactionLedger - |> expect(:write_inputs, fn ^tx_address, list -> - assert 1 = Enum.count(list) - :ok - end) + MockTransactionLedger + |> stub(:write_inputs, fn ^tx_address, inputs -> + assert length(inputs) == 1 + :ok + end) - tx_summary_bin = TransactionSummary.serialize(tx_summary) - signature = Crypto.sign(tx_summary_bin, priv) + tx_summary_bin = TransactionSummary.serialize(tx_summary) + signature = Crypto.sign(tx_summary_bin, priv) - attestations = [ - %ReplicationAttestation{ - transaction_summary: tx_summary, - confirmations: [{0, signature}] - } - ] + attestations = [ + %ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: [{0, signature}] + } + ] - assert 1 = Sync.process_replication_attestations(attestations, [node1]) - end + assert 1 == Sync.process_replication_attestations(attestations, [node1]) end defp create_p2p_context do