diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index e042a6828..6d338a044 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -16,6 +16,7 @@ defmodule Archethic.BeaconChain do alias __MODULE__.SummaryAggregate alias __MODULE__.SummaryTimer alias __MODULE__.Update + alias __MODULE__.ReplicationAttestation alias Archethic.Crypto diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index b3ec9913c..11ae0e461 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -13,7 +13,6 @@ defmodule Archethic.Bootstrap do alias Archethic.Replication alias Archethic.SelfRepair - alias Archethic.SelfRepair.NetworkChain alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction @@ -247,8 +246,11 @@ defmodule Archethic.Bootstrap do Archethic.Bootstrap.NetworkConstraints.persist_genesis_address() - Logger.info("Enforced Resync: Started!") - NetworkChain.synchronous_resync_many([:node, :oracle, :origin, :node_shared_secrets]) + if P2P.authorized_node?(Crypto.first_node_public_key()) do + Logger.info("Current summary synchronization started") + SelfRepair.synchronize_current_summary() + Logger.info("Current summary synchronization finished") + end Sync.publish_end_of_sync() SelfRepair.start_scheduler() diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 9a2a4b5e5..da239b3e0 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -31,6 +31,8 @@ defmodule Archethic.P2P.Message do GetBeaconSummariesAggregate, GetBootstrappingNodes, GetCurrentSummaries, + GetCurrentReplicationsAttestations, + GetCurrentReplicationsAttestationsResponse, GetLastTransaction, GetLastTransactionAddress, GetNextAddresses, @@ -118,6 +120,7 @@ defmodule Archethic.P2P.Message do | GetGenesisAddress.t() | ValidationError.t() | GetCurrentSummaries.t() + | GetCurrentReplicationsAttestations.t() | GetBeaconSummariesAggregate.t() | NotifyPreviousChain.t() | ShardRepair.t() @@ -158,6 +161,7 @@ defmodule Archethic.P2P.Message do | NetworkStats.t() | SmartContractCallValidation.t() | DashboardData.t() + | GetCurrentReplicationsAttestationsResponse.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 0d810bab8..f8c5d368a 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -286,4 +286,18 @@ defmodule Archethic.SelfRepair do """ @spec next_repair_time() :: DateTime.t() defdelegate next_repair_time, to: Scheduler + + @doc """ + Synchronously synchronize all the transactions that happened since last summary aggregate + """ + @spec synchronize_current_summary() :: :ok + def synchronize_current_summary() do + download_nodes = + DateTime.utc_now() + |> BeaconChain.next_summary_date() + |> P2P.authorized_and_available_nodes(true) + + BeaconChain.list_replications_attestations_from_current_slot() + |> Sync.process_replications_attestations(download_nodes) + end end diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 9839ad5fc..9ad7730ca 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -353,6 +353,17 @@ defmodule Archethic.SelfRepair.Sync do store_last_sync_date(summary_time) end + @spec process_replications_attestations(list(ReplicationAttestation.t()), list(Node.t())) :: :ok + def process_replications_attestations(replications_attestations, download_nodes) do + nodes_including_self = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes() + + replications_attestations + |> adjust_attestations(download_nodes) + |> Stream.filter(&TransactionHandler.download_transaction?(&1, nodes_including_self)) + |> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime}) + |> synchronize_transactions(download_nodes) + end + # To avoid beacon chain database migration we have to support both summaries with genesis address and without # Hence, we need to adjust or revised the attestation to include the genesis addresses # which is not present in the version 1 of transaction's summary. diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index dac344f4e..b897484d4 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -32,7 +32,9 @@ defmodule Archethic.BootstrapTest do Ok, TransactionChainLength, TransactionInputList, - TransactionList + TransactionList, + GetCurrentReplicationsAttestations, + GetCurrentReplicationsAttestationsResponse } alias Archethic.Replication @@ -126,6 +128,13 @@ defmodule Archethic.BootstrapTest do _, %GetGenesisAddress{}, _ -> {:ok, %NotFound{}} + + _, %GetCurrentReplicationsAttestations{}, _ -> + {:ok, + %GetCurrentReplicationsAttestationsResponse{ + replications_attestations: [], + more?: false + }} end) {:ok, daily_nonce_agent} = Agent.start_link(fn -> %{} end) diff --git a/test/archethic/self_repair/self_repair_test.exs b/test/archethic/self_repair/self_repair_test.exs index 735cfb5a8..df4d6c46f 100644 --- a/test/archethic/self_repair/self_repair_test.exs +++ b/test/archethic/self_repair/self_repair_test.exs @@ -2,6 +2,9 @@ defmodule Archethic.SelfRepairTest do @moduledoc false use ArchethicCase + alias Archethic.BeaconChain + alias Archethic.BeaconChain.ReplicationAttestation + alias Archethic.Crypto alias Archethic.P2P @@ -12,10 +15,12 @@ defmodule Archethic.SelfRepairTest do alias Archethic.Replication alias Archethic.SelfRepair + alias Archethic.SelfRepair.Sync.TransactionHandler + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp - + alias Archethic.TransactionChain.TransactionSummary alias Archethic.TransactionFactory import ArchethicCase @@ -153,4 +158,149 @@ defmodule Archethic.SelfRepairTest do SelfRepair.replicate_transaction(address, false) end end + + describe "synchronize_current_summary/0" do + test "should be able to run when there's nothing to sync" do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2001-01-01 00:00:00Z] + }) + + with_mock(BeaconChain, [:passthrough], + next_summary_date: fn _ -> DateTime.utc_now() end, + list_replications_attestations_from_current_slot: fn -> [] end + ) do + with_mock(TransactionHandler, [:passthrough], []) do + assert :ok = SelfRepair.synchronize_current_summary() + + assert_not_called(TransactionHandler.download_transaction(:_, :_)) + end + end + end + + test "should resync missed transactions" do + now = DateTime.utc_now() + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2001-01-01 00:00:00Z] + }) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3001, + first_public_key: random_public_key(), + last_public_key: random_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2001-01-01 00:00:00Z] + }) + + replication_attestation1 = random_replication_attestation(now) + replication_attestation2 = random_replication_attestation(now) + replication_attestation3 = random_replication_attestation(now) + + with_mock(BeaconChain, [:passthrough], + next_summary_date: fn _ -> now end, + list_replications_attestations_from_current_slot: fn -> + [ + replication_attestation1, + replication_attestation2, + replication_attestation3 + ] + end + ) do + with_mock(TransactionHandler, [:passthrough], + download_transaction: fn _, _ -> :ok end, + process_transaction: fn _, _, _ -> :ok end + ) do + assert :ok = SelfRepair.synchronize_current_summary() + + assert_called(TransactionHandler.download_transaction(replication_attestation1, :_)) + assert_called(TransactionHandler.download_transaction(replication_attestation2, :_)) + assert_called(TransactionHandler.download_transaction(replication_attestation3, :_)) + end + end + end + + test "should not resync a transaction already existing" do + now = DateTime.utc_now() + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2001-01-01 00:00:00Z] + }) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3001, + first_public_key: random_public_key(), + last_public_key: random_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2001-01-01 00:00:00Z] + }) + + replication_attestation1 = random_replication_attestation(now) + + with_mock(BeaconChain, [:passthrough], + next_summary_date: fn _ -> now end, + list_replications_attestations_from_current_slot: fn -> + [ + replication_attestation1 + ] + end + ) do + with_mock(TransactionChain, [:passthrough], transaction_exists?: fn _ -> true end) do + with_mock(TransactionHandler, [:passthrough], download_transaction: fn _, _ -> :ok end) do + assert :ok = SelfRepair.synchronize_current_summary() + + assert_not_called( + TransactionHandler.download_transaction(replication_attestation1, :_) + ) + end + end + end + end + end + + defp random_replication_attestation(datetime) do + %ReplicationAttestation{ + version: 2, + transaction_summary: %TransactionSummary{ + address: random_address(), + type: :transfer, + timestamp: datetime, + fee: 10_000_000, + validation_stamp_checksum: :crypto.strong_rand_bytes(32), + genesis_address: random_address() + }, + confirmations: Enum.map(0..9, &{&1, "signature#{&1}"}) + } + end end