Skip to content

Commit

Permalink
Resynchronize missed transactions (of current summary) on bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Apr 8, 2024
1 parent 24b707e commit 1ebc3e6
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Archethic.BeaconChain do
alias __MODULE__.SummaryAggregate
alias __MODULE__.SummaryTimer
alias __MODULE__.Update
alias __MODULE__.ReplicationAttestation

alias Archethic.Crypto

Expand Down
8 changes: 5 additions & 3 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Archethic.Bootstrap do
alias Archethic.Replication

alias Archethic.SelfRepair
alias Archethic.SelfRepair.NetworkChain

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
Expand Down Expand Up @@ -247,8 +246,11 @@ defmodule Archethic.Bootstrap do

Archethic.Bootstrap.NetworkConstraints.persist_genesis_address()

Logger.info("Enforced Resync: Started!")
NetworkChain.synchronous_resync_many([:node, :oracle, :origin, :node_shared_secrets])
if P2P.authorized_node?(Crypto.first_node_public_key()) do
Logger.info("Current summary synchronization started")
SelfRepair.synchronize_current_summary()
Logger.info("Current summary synchronization finished")
end

Sync.publish_end_of_sync()
SelfRepair.start_scheduler()
Expand Down
4 changes: 4 additions & 0 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Archethic.P2P.Message do
GetBeaconSummariesAggregate,
GetBootstrappingNodes,
GetCurrentSummaries,
GetCurrentReplicationsAttestations,
GetCurrentReplicationsAttestationsResponse,
GetLastTransaction,
GetLastTransactionAddress,
GetNextAddresses,
Expand Down Expand Up @@ -118,6 +120,7 @@ defmodule Archethic.P2P.Message do
| GetGenesisAddress.t()
| ValidationError.t()
| GetCurrentSummaries.t()
| GetCurrentReplicationsAttestations.t()
| GetBeaconSummariesAggregate.t()
| NotifyPreviousChain.t()
| ShardRepair.t()
Expand Down Expand Up @@ -158,6 +161,7 @@ defmodule Archethic.P2P.Message do
| NetworkStats.t()
| SmartContractCallValidation.t()
| DashboardData.t()
| GetCurrentReplicationsAttestationsResponse.t()

@floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed])
@content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size)
Expand Down
14 changes: 14 additions & 0 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,18 @@ defmodule Archethic.SelfRepair do
"""
@spec next_repair_time() :: DateTime.t()
defdelegate next_repair_time, to: Scheduler

@doc """
Synchronously synchronize all the transactions that happened since last summary aggregate
"""
@spec synchronize_current_summary() :: :ok
def synchronize_current_summary() do
download_nodes =
DateTime.utc_now()
|> BeaconChain.next_summary_date()
|> P2P.authorized_and_available_nodes(true)

BeaconChain.list_replications_attestations_from_current_slot()
|> Sync.process_replications_attestations(download_nodes)
end
end
11 changes: 11 additions & 0 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,17 @@ defmodule Archethic.SelfRepair.Sync do
store_last_sync_date(summary_time)
end

@spec process_replications_attestations(list(ReplicationAttestation.t()), list(Node.t())) :: :ok
def process_replications_attestations(replications_attestations, download_nodes) do
nodes_including_self = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes()

replications_attestations
|> adjust_attestations(download_nodes)
|> Stream.filter(&TransactionHandler.download_transaction?(&1, nodes_including_self))
|> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime})
|> synchronize_transactions(download_nodes)
end

# To avoid beacon chain database migration we have to support both summaries with genesis address and without
# Hence, we need to adjust or revised the attestation to include the genesis addresses
# which is not present in the version 1 of transaction's summary.
Expand Down
11 changes: 10 additions & 1 deletion test/archethic/bootstrap_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ defmodule Archethic.BootstrapTest do
Ok,
TransactionChainLength,
TransactionInputList,
TransactionList
TransactionList,
GetCurrentReplicationsAttestations,
GetCurrentReplicationsAttestationsResponse
}

alias Archethic.Replication
Expand Down Expand Up @@ -126,6 +128,13 @@ defmodule Archethic.BootstrapTest do

_, %GetGenesisAddress{}, _ ->
{:ok, %NotFound{}}

_, %GetCurrentReplicationsAttestations{}, _ ->
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: [],
more?: false
}}
end)

{:ok, daily_nonce_agent} = Agent.start_link(fn -> %{} end)
Expand Down
152 changes: 151 additions & 1 deletion test/archethic/self_repair/self_repair_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ defmodule Archethic.SelfRepairTest do
@moduledoc false
use ArchethicCase

alias Archethic.BeaconChain
alias Archethic.BeaconChain.ReplicationAttestation

alias Archethic.Crypto

alias Archethic.P2P
Expand All @@ -12,10 +15,12 @@ defmodule Archethic.SelfRepairTest do

alias Archethic.Replication
alias Archethic.SelfRepair
alias Archethic.SelfRepair.Sync.TransactionHandler

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

alias Archethic.TransactionChain.TransactionSummary
alias Archethic.TransactionFactory

import ArchethicCase
Expand Down Expand Up @@ -153,4 +158,149 @@ defmodule Archethic.SelfRepairTest do
SelfRepair.replicate_transaction(address, false)
end
end

describe "synchronize_current_summary/0" do
test "should be able to run when there's nothing to sync" do
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.first_node_public_key(),
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: ~U[2001-01-01 00:00:00Z]
})

with_mock(BeaconChain, [:passthrough],
next_summary_date: fn _ -> DateTime.utc_now() end,
list_replications_attestations_from_current_slot: fn -> [] end
) do
with_mock(TransactionHandler, [:passthrough], []) do
assert :ok = SelfRepair.synchronize_current_summary()

assert_not_called(TransactionHandler.download_transaction(:_, :_))
end
end
end

test "should resync missed transactions" do
now = DateTime.utc_now()

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.first_node_public_key(),
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: ~U[2001-01-01 00:00:00Z]
})

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3001,
first_public_key: random_public_key(),
last_public_key: random_public_key(),
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: ~U[2001-01-01 00:00:00Z]
})

replication_attestation1 = random_replication_attestation(now)
replication_attestation2 = random_replication_attestation(now)
replication_attestation3 = random_replication_attestation(now)

with_mock(BeaconChain, [:passthrough],
next_summary_date: fn _ -> now end,
list_replications_attestations_from_current_slot: fn ->
[
replication_attestation1,
replication_attestation2,
replication_attestation3
]
end
) do
with_mock(TransactionHandler, [:passthrough],
download_transaction: fn _, _ -> :ok end,
process_transaction: fn _, _, _ -> :ok end
) do
assert :ok = SelfRepair.synchronize_current_summary()

assert_called(TransactionHandler.download_transaction(replication_attestation1, :_))
assert_called(TransactionHandler.download_transaction(replication_attestation2, :_))
assert_called(TransactionHandler.download_transaction(replication_attestation3, :_))
end
end
end

test "should not resync a transaction already existing" do
now = DateTime.utc_now()

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.first_node_public_key(),
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: ~U[2001-01-01 00:00:00Z]
})

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3001,
first_public_key: random_public_key(),
last_public_key: random_public_key(),
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: ~U[2001-01-01 00:00:00Z]
})

replication_attestation1 = random_replication_attestation(now)

with_mock(BeaconChain, [:passthrough],
next_summary_date: fn _ -> now end,
list_replications_attestations_from_current_slot: fn ->
[
replication_attestation1
]
end
) do
with_mock(TransactionChain, [:passthrough], transaction_exists?: fn _ -> true end) do
with_mock(TransactionHandler, [:passthrough], download_transaction: fn _, _ -> :ok end) do
assert :ok = SelfRepair.synchronize_current_summary()

assert_not_called(
TransactionHandler.download_transaction(replication_attestation1, :_)
)
end
end
end
end
end

defp random_replication_attestation(datetime) do
%ReplicationAttestation{
version: 2,
transaction_summary: %TransactionSummary{
address: random_address(),
type: :transfer,
timestamp: datetime,
fee: 10_000_000,
validation_stamp_checksum: :crypto.strong_rand_bytes(32),
genesis_address: random_address()
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
}
end
end

0 comments on commit 1ebc3e6

Please sign in to comment.