diff --git a/lib/archethic/self_repair/notifier.ex b/lib/archethic/self_repair/notifier.ex index 124283078..0ddf55e2a 100644 --- a/lib/archethic/self_repair/notifier.ex +++ b/lib/archethic/self_repair/notifier.ex @@ -1,24 +1,45 @@ defmodule Archethic.SelfRepair.Notifier do - @moduledoc false + @moduledoc """ + Process to handle repair in case of topology change by trying to replicate transactions to new shard composition. + + When a node receive a topology change due to the unavailability of a node, + we compute the new election for the already stored transactions. + + Hence, a new shard might me formed as we notify the new transactions to the + new storage nodes + + ```mermaid + flowchart TD + A[Node 4] --x|Topology change notification| B[Node1] + B --> | List transactions| B + B -->|Elect new nodes| H[Transaction replication] + H -->|Replicate Transaction| C[Node2] + H -->|Replicate Transaction| D[Node3] + ``` + + """ use GenServer - # alias Archethic.Crypto - # - # alias Archethic.Election - # + alias Archethic.Crypto + + alias Archethic.Election + alias Archethic.PubSub - # alias Archethic.P2P - # alias Archethic.P2P.Message.ReplicateTransactionChain + alias Archethic.P2P + alias Archethic.P2P.Message.ReplicateTransaction alias Archethic.P2P.Node - # alias Archethic.TaskSupervisor - # - # alias Archethic.TransactionChain - # alias Archethic.TransactionChain.Transaction - # - # alias Archethic.Utils + alias Archethic.TaskSupervisor + + alias Archethic.TransactionChain + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + + alias Archethic.Utils + + require Logger def start_link(args \\ []) do GenServer.start_link(__MODULE__, args) @@ -26,79 +47,126 @@ defmodule Archethic.SelfRepair.Notifier do def init(_) do PubSub.register_to_node_update() - {:ok, []} + {:ok, %{notified: %{}}} end def handle_info( - {:node_update, %Node{available?: true, authorized?: true, first_public_key: _node_key}}, - state + {:node_update, + %Node{ + available?: false, + authorized?: true, + first_public_key: node_key, + authorization_date: authorization_date + }}, + state = %{notified: notified} ) do - # FIXME - # TODO: Leverage node update reason to know if we have to take the new node in consideration or to reject the nodes to replay election algorithm - - # current_node_public_key = Crypto.first_node_public_key() - - # if node_key == current_node_public_key do - # {:noreply, state} - # else - # node_list = P2P.list_nodes() |> Enum.reject(&(&1.first_public_key == node_key)) + current_node_public_key = Crypto.first_node_public_key() + now = DateTime.utc_now() |> DateTime.truncate(:millisecond) + + with :lt <- DateTime.compare(authorization_date, now), + nil <- Map.get(notified, node_key), + false <- current_node_public_key == node_key do + repair_transactions(node_key, current_node_public_key) + {:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))} + else + _ -> + {:noreply, state} + end + end - # node_key - # |> get_transactions_to_sync(node_list) - # |> forward_transactions(node_list, current_node_public_key) + def handle_info( + {:node_update, + %Node{authorized?: false, authorization_date: date, first_public_key: node_key}}, + state = %{notified: notified} + ) + when date != nil do + current_node_public_key = Crypto.first_node_public_key() + + with nil <- Map.get(notified, node_key), + false <- current_node_public_key == node_key do + repair_transactions(node_key, current_node_public_key) + {:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))} + else + _ -> + {:noreply, state} + end + end - # {:noreply, state} - # end - {:noreply, state} + def handle_info( + {:node_update, + %Node{available?: true, first_public_key: node_key, authorization_date: date}}, + state + ) + when date != nil do + {:noreply, Map.update!(state, :notified, &Map.delete(&1, node_key))} end def handle_info(_, state) do {:noreply, state} end - # defp get_transactions_to_sync(node_public_key, node_list) do - # TransactionChain.list_all([:address, :type]) - # |> Stream.map(fn %Transaction{address: address, type: type} -> - # {address, type, Election.chain_storage_nodes_with_type(address, type, node_list)} - # end) - # |> Stream.filter(fn {_address, _type, nodes} -> - # Utils.key_in_node_list?(nodes, node_public_key) - # end) - # end - - # defp forward_transactions( - # transactions, - # node_list, - # current_node_public_key - # ) do - # transactions - # |> Stream.each(fn {address, type, _nodes} -> - # previous_storage_nodes = - # Election.chain_storage_nodes_with_type( - # address, - # type, - # node_list - # ) - - # next_storage_nodes = - # Election.chain_storage_nodes_with_type( - # address, - # type, - # P2P.available_nodes() -- previous_storage_nodes - # ) - - # with true <- Utils.key_in_node_list?(next_storage_nodes, current_node_public_key), - # {:ok, tx} <- TransactionChain.get_transaction(address) do - # Task.Supervisor.async_stream_nolink( - # TaskSupervisor, - # next_storage_nodes, - # &P2P.send_message(&1, %ReplicateTransactionChain{transaction: tx}), - # ordered: false, - # on_timeout: :kill_task - # ) - # |> Stream.run() - # end - # end) - # |> Stream.run() - # end + defp repair_transactions(node_key, current_node_public_key) do + Logger.debug("Trying to repair transactions due to a topology change", + node: Base.encode16(node_key) + ) + + node_key + |> get_transactions_to_sync() + |> Stream.each(&forward_transaction(&1, current_node_public_key)) + |> Stream.run() + end + + defp get_transactions_to_sync(node_public_key) do + # We fetch all the transactions existing and check if the disconnecting node was a storage node + TransactionChain.list_all([:address, :type, validation_stamp: [:timestamp]]) + |> Stream.map( + fn tx = %Transaction{ + address: address, + type: type, + validation_stamp: %ValidationStamp{timestamp: timestamp} + } -> + node_list = + Enum.filter( + P2P.list_nodes(), + &(&1.authorization_date != nil and + DateTime.compare(&1.authorization_date, timestamp) == :lt) + ) + + {tx, Election.chain_storage_nodes_with_type(address, type, node_list)} + end + ) + |> Stream.filter(fn {_tx, nodes} -> + Utils.key_in_node_list?(nodes, node_public_key) + end) + end + + defp forward_transaction( + {tx = %Transaction{address: address, type: type}, previous_storage_nodes}, + current_node_public_key + ) do + # We compute the new storage nodes minus the previous ones + new_storage_nodes = + Election.chain_storage_nodes_with_type( + address, + type, + P2P.authorized_nodes() -- previous_storage_nodes + ) + + with false <- Enum.empty?(new_storage_nodes), + true <- Utils.key_in_node_list?(previous_storage_nodes, current_node_public_key) do + Logger.info("Repair started due to network topology change", + transaction_address: Base.encode16(address), + transaction_type: type + ) + + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + new_storage_nodes, + &P2P.send_message(&1, %ReplicateTransaction{transaction: tx}), + ordered: false, + on_timeout: :kill_task + ) + |> Stream.run() + end + end end diff --git a/test/archethic/self_repair/notifier_test.exs b/test/archethic/self_repair/notifier_test.exs new file mode 100644 index 000000000..5b073b839 --- /dev/null +++ b/test/archethic/self_repair/notifier_test.exs @@ -0,0 +1,88 @@ +defmodule Archethic.SelfRepair.NotifierTest do + use ArchethicCase + + alias Archethic.Crypto + + alias Archethic.P2P + alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message.ReplicateTransaction + alias Archethic.P2P.Node + + alias Archethic.SelfRepair.Notifier + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + + import Mox + + test "when a node is becoming offline new nodes should receive transaction to replicate" do + P2P.add_and_connect_node(%Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + authorization_date: ~U[2022-02-01 00:00:00Z], + geo_patch: "AAA" + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001, + authorized?: true, + authorization_date: ~U[2022-02-01 00:00:00Z], + geo_patch: "CCC" + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002, + authorized?: true, + authorization_date: ~U[2022-02-03 00:00:00Z], + geo_patch: "DDD" + }) + + {:ok, pid} = Notifier.start_link() + + MockDB + |> expect(:list_transactions, fn _ -> + [ + %Transaction{ + address: "@Alice1", + type: :transfer, + validation_stamp: %ValidationStamp{ + timestamp: ~U[2022-02-01 12:54:00Z] + } + } + ] + end) + + me = self() + + MockClient + |> expect(:send_message, fn %Node{first_public_key: "node3"}, + %ReplicateTransaction{ + transaction: %Transaction{address: "@Alice1"} + }, + _ -> + send(me, :tx_replicated) + %Ok{} + end) + + send( + pid, + {:node_update, + %Node{ + first_public_key: "node2", + available?: false, + authorized?: true, + authorization_date: ~U[2022-02-01 00:00:00Z] + }} + ) + + assert_receive :tx_replicated + end +end