diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index f79fc15ea..4eef1dca3 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -391,9 +391,10 @@ defmodule Archethic.P2P.Message do def encode(%NotifyLastTransactionAddress{ last_address: last_address, genesis_address: genesis_address, + previous_address: previous_address, timestamp: timestamp }) do - <<22::8, last_address::binary, genesis_address::binary, + <<22::8, last_address::binary, genesis_address::binary, previous_address::binary, DateTime.to_unix(timestamp, :millisecond)::64>> end @@ -904,11 +905,13 @@ defmodule Archethic.P2P.Message do def decode(<<22::8, rest::bitstring>>) do {last_address, rest} = Utils.deserialize_address(rest) - {genesis_address, <>} = Utils.deserialize_address(rest) + {genesis_address, rest} = Utils.deserialize_address(rest) + {previous_address, <>} = Utils.deserialize_address(rest) {%NotifyLastTransactionAddress{ last_address: last_address, genesis_address: genesis_address, + previous_address: previous_address, timestamp: DateTime.from_unix!(timestamp, :millisecond) }, rest} end @@ -1716,19 +1719,24 @@ defmodule Archethic.P2P.Message do def process( %NotifyLastTransactionAddress{ - last_address: last_address, genesis_address: genesis_address, + last_address: last_address, + previous_address: previous_address, timestamp: timestamp }, _ ) do - with {local_last_address, _} <- - TransactionChain.get_last_address(genesis_address), + with {local_last_address, _} <- TransactionChain.get_last_address(genesis_address), true <- local_last_address != last_address do - TransactionChain.register_last_address(genesis_address, last_address, timestamp) + if local_last_address == previous_address do + TransactionChain.register_last_address(genesis_address, last_address, timestamp) - # Stop potential previous smart contract - Contracts.stop_contract(local_last_address) + # Stop potential previous smart contract + Contracts.stop_contract(local_last_address) + else + authorized_nodes = P2P.authorized_and_available_nodes() + SelfRepair.update_last_address(local_last_address, authorized_nodes) + end end %Ok{} diff --git a/lib/archethic/p2p/message/notify_last_transaction_address.ex b/lib/archethic/p2p/message/notify_last_transaction_address.ex index d2f0c2789..0de62218c 100644 --- a/lib/archethic/p2p/message/notify_last_transaction_address.ex +++ b/lib/archethic/p2p/message/notify_last_transaction_address.ex @@ -2,14 +2,15 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do @moduledoc """ Represents a message with to notify a pool of the last address of a previous address """ - @enforce_keys [:last_address, :genesis_address, :timestamp] - defstruct [:last_address, :genesis_address, :timestamp] + @enforce_keys [:last_address, :genesis_address, :previous_address, :timestamp] + defstruct [:last_address, :genesis_address, :previous_address, :timestamp] alias Archethic.Crypto @type t :: %__MODULE__{ last_address: Crypto.versioned_hash(), genesis_address: Crypto.versioned_hash(), + previous_address: Crypto.versioned_hash(), timestamp: DateTime.t() } end diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index 89a3fb497..d5f7bbb4a 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -344,9 +344,11 @@ defmodule Archethic.Replication do last_storage_nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) - {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} = + {:ok, tx = %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} = TransactionChain.get_transaction(address, validation_stamp: [:timestamp]) + previous_address = Transaction.previous_address(tx) + # Send a message to all the previous storage nodes genesis_address |> TransactionChain.list_chain_addresses() @@ -359,7 +361,8 @@ defmodule Archethic.Replication do |> P2P.broadcast_message(%NotifyLastTransactionAddress{ last_address: address, genesis_address: genesis_address, - timestamp: timestamp + timestamp: timestamp, + previous_address: previous_address }) end end diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 7aa7a5759..04b36d456 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -11,14 +11,17 @@ defmodule Archethic.SelfRepair do alias __MODULE__.Scheduler alias __MODULE__.Sync - alias Archethic.BeaconChain - - alias Archethic.Crypto + alias Archethic.{ + BeaconChain, + Crypto, + Utils, + Contracts, + TransactionChain, + Election + } alias Archethic.P2P.Node - alias Archethic.Utils - alias Crontab.CronExpression.Parser, as: CronParser alias Crontab.Scheduler, as: CronScheduler @@ -159,4 +162,37 @@ defmodule Archethic.SelfRepair do |> Keyword.get(Scheduler) |> Scheduler.config_change() end + + @doc """ + Request missing transaction addresses from last local address until last chain address + and add them in the DB + """ + def update_last_address(address, authorized_nodes) do + # As the node is storage node of this chain, it needs to know all the addresses of the chain until the last + # So we get the local last address and verify if it's the same as the last address of the chain + # by requesting the nodes which already know the last address + + {last_local_address, _timestamp} = TransactionChain.get_last_address(address) + storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes) + + case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do + {:ok, []} -> + :ok + + {:ok, addresses} -> + genesis_address = TransactionChain.get_genesis_address(address) + + addresses + |> Enum.sort_by(fn {_address, timestamp} -> timestamp end) + |> Enum.each(fn {address, timestamp} -> + TransactionChain.register_last_address(genesis_address, address, timestamp) + end) + + # Stop potential previous smart contract + Contracts.stop_contract(address) + + _ -> + :ok + end + end end diff --git a/lib/archethic/self_repair/repair_worker.ex b/lib/archethic/self_repair/repair_worker.ex index d2113bd33..8ccb643f8 100644 --- a/lib/archethic/self_repair/repair_worker.ex +++ b/lib/archethic/self_repair/repair_worker.ex @@ -2,12 +2,12 @@ defmodule Archethic.SelfRepair.RepairWorker do @moduledoc false alias Archethic.{ - Contracts, BeaconChain, Election, P2P, Replication, - TransactionChain + TransactionChain, + SelfRepair } alias Archethic.SelfRepair.RepairRegistry @@ -130,7 +130,7 @@ defmodule Archethic.SelfRepair.RepairWorker do {:ok, tx} <- TransactionChain.fetch_transaction_remotely(address, storage_nodes) do if storage? do case Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes) do - :ok -> update_last_address(address, authorized_nodes) + :ok -> SelfRepair.update_last_address(address, authorized_nodes) error -> error end else @@ -148,37 +148,4 @@ defmodule Archethic.SelfRepair.RepairWorker do ) end end - - @doc """ - Request missing transaction addresses from last local address until last chain address - and add them in the DB - """ - def update_last_address(address, authorized_nodes) do - # As the node is storage node of this chain, it needs to know all the addresses of the chain until the last - # So we get the local last address and verify if it's the same as the last address of the chain - # by requesting the nodes which already know the last address - - {last_local_address, _timestamp} = TransactionChain.get_last_address(address) - storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes) - - case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do - {:ok, []} -> - :ok - - {:ok, addresses} -> - genesis_address = TransactionChain.get_genesis_address(address) - - addresses - |> Enum.sort_by(fn {_address, timestamp} -> timestamp end) - |> Enum.each(fn {address, timestamp} -> - TransactionChain.register_last_address(genesis_address, address, timestamp) - end) - - # Stop potential previous smart contract - Contracts.stop_contract(address) - - _ -> - :ok - end - end end diff --git a/test/archethic/p2p/messages_test.exs b/test/archethic/p2p/messages_test.exs index 5e9fd8536..2d954c5c7 100644 --- a/test/archethic/p2p/messages_test.exs +++ b/test/archethic/p2p/messages_test.exs @@ -915,6 +915,7 @@ defmodule Archethic.P2P.MessageTest do msg = %NotifyLastTransactionAddress{ last_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, genesis_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond) } diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index 6695310b0..dfb179379 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -8,6 +8,7 @@ defmodule Archethic.ReplicationTest do alias Archethic.Mining.Fee alias Archethic.P2P + alias Archethic.P2P.Message alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.TransactionChainLength alias Archethic.P2P.Message.GetTransaction @@ -250,10 +251,16 @@ defmodule Archethic.ReplicationTest do }) end) + previous_public_key = "previous_public_key" + MockDB |> expect(:get_first_chain_address, fn _ -> "@Alice0" end) |> expect(:get_transaction, fn _, _ -> - {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}}} + {:ok, + %Transaction{ + validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}, + previous_public_key: previous_public_key + }} end) |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end) @@ -261,16 +268,43 @@ defmodule Archethic.ReplicationTest do |> stub(:send_message, fn _, %NotifyLastTransactionAddress{ last_address: last_address, - genesis_address: genesis_address + genesis_address: genesis_address, + previous_address: previous_address }, _ -> - send(me, {:last_address, last_address, genesis_address}) + send(me, {:last_address, last_address, genesis_address, previous_address}) {:ok, %Ok{}} end) + derived_previous_address = Crypto.derive_address(previous_public_key) + assert :ok = Replication.acknowledge_previous_storage_nodes("@Alice2") - assert_receive {:last_address, "@Alice2", "@Alice0"} + assert_receive {:last_address, "@Alice2", "@Alice0", ^derived_previous_address} + end + + test "should process NotifyLastTransactionAddress message with TransactionChain.register_last_address if last address is different than the previous address" do + previous_public_key = "previous_public_key" + + MockDB + |> expect(:get_first_chain_address, fn _ -> "@Alice0" end) + |> expect(:get_transaction, fn _, _ -> + {:ok, + %Transaction{ + validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}, + previous_public_key: previous_public_key + }} + end) + |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end) + |> expect(:add_last_transaction_address, 0, fn _, _, _ -> :ok end) + + MockClient + |> stub(:send_message, fn _, msg = %NotifyLastTransactionAddress{}, _ -> + Message.process(msg, "key") + {:ok, %Ok{}} + end) + + assert :ok = Replication.acknowledge_previous_storage_nodes("@Alice2") end end end diff --git a/test/archethic/self_repair/repair_worker_test.exs b/test/archethic/self_repair/repair_worker_test.exs index 2d5bd2f3b..6737d48b8 100644 --- a/test/archethic/self_repair/repair_worker_test.exs +++ b/test/archethic/self_repair/repair_worker_test.exs @@ -4,19 +4,12 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do alias Archethic.BeaconChain.SummaryTimer - alias Archethic.Crypto - alias Archethic.P2P - alias Archethic.P2P.Client.DefaultImpl alias Archethic.P2P.Node - alias Archethic.P2P.Message.GetNextAddresses alias Archethic.P2P.Message.GetTransaction alias Archethic.SelfRepair.RepairWorker - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp - import Mox setup do @@ -113,52 +106,4 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do task: _task_pid } = :sys.get_state(pid) end - - test "update_last_address/1 should request missing addresses and add them in DB" do - node = %Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - geo_patch: "AAA", - authorized?: true, - authorization_date: ~U[2022-11-27 00:00:00Z], - available?: true, - availability_history: <<1::1>> - } - - me = self() - - MockDB - |> expect(:get_last_chain_address, fn "Alice2" -> {"Alice2", ~U[2022-11-27 00:10:00Z]} end) - |> expect(:get_transaction, fn "Alice2", _ -> - {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: ~U[2022-11-27 00:10:00Z]}}} - end) - |> expect(:get_first_chain_address, 2, fn "Alice2" -> "Alice0" end) - |> expect(:list_chain_addresses, fn "Alice0" -> - [ - {"Alice1", ~U[2022-11-27 00:09:00Z]}, - {"Alice2", ~U[2022-11-27 00:10:00Z]}, - {"Alice3", ~U[2022-11-27 00:11:00Z]}, - {"Alice4", ~U[2022-11-27 00:12:00Z]} - ] - end) - |> expect(:add_last_transaction_address, 2, fn - "Alice0", "Alice3", ~U[2022-11-27 00:11:00Z] -> - send(me, :add_alice3) - - "Alice0", "Alice4", ~U[2022-11-27 00:12:00Z] -> - send(me, :add_alice4) - end) - - MockClient - |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout -> - send(me, :get_next_addresses) - DefaultImpl.send_message(node, msg, timeout) - end) - - RepairWorker.update_last_address("Alice2", [node]) - - assert_receive :get_next_addresses - assert_receive :add_alice3 - assert_receive :add_alice4 - end end diff --git a/test/archethic/self_repair/self_repair_test.exs b/test/archethic/self_repair/self_repair_test.exs new file mode 100644 index 000000000..481f3f0a7 --- /dev/null +++ b/test/archethic/self_repair/self_repair_test.exs @@ -0,0 +1,74 @@ +defmodule Archethic.SelfRepairTest do + @moduledoc false + use ArchethicCase + + alias Archethic.BeaconChain.SummaryTimer + + alias Archethic.P2P.Client.DefaultImpl + + alias Archethic.Crypto + + alias Archethic.P2P.Node + alias Archethic.P2P.Message.GetNextAddresses + + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + + alias Archethic.SelfRepair + + import Mox + + setup do + start_supervised!({SummaryTimer, interval: "0 0 * * *"}) + + :ok + end + + test "update_last_address/1 should request missing addresses and add them in DB" do + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + authorized?: true, + authorization_date: ~U[2022-11-27 00:00:00Z], + available?: true, + availability_history: <<1::1>> + } + + me = self() + + MockDB + |> expect(:get_last_chain_address, fn "Alice2" -> {"Alice2", ~U[2022-11-27 00:10:00Z]} end) + |> expect(:get_transaction, fn "Alice2", _ -> + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: ~U[2022-11-27 00:10:00Z]}}} + end) + |> expect(:get_first_chain_address, 2, fn "Alice2" -> "Alice0" end) + |> expect(:list_chain_addresses, fn "Alice0" -> + [ + {"Alice1", ~U[2022-11-27 00:09:00Z]}, + {"Alice2", ~U[2022-11-27 00:10:00Z]}, + {"Alice3", ~U[2022-11-27 00:11:00Z]}, + {"Alice4", ~U[2022-11-27 00:12:00Z]} + ] + end) + |> expect(:add_last_transaction_address, 2, fn + "Alice0", "Alice3", ~U[2022-11-27 00:11:00Z] -> + send(me, :add_alice3) + + "Alice0", "Alice4", ~U[2022-11-27 00:12:00Z] -> + send(me, :add_alice4) + end) + + MockClient + |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout -> + send(me, :get_next_addresses) + DefaultImpl.send_message(node, msg, timeout) + end) + + SelfRepair.update_last_address("Alice2", [node]) + + assert_receive :get_next_addresses + assert_receive :add_alice3 + assert_receive :add_alice4 + end +end