Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap: Replicate missed transaction of current summary #1494

Merged
Merged
88 changes: 70 additions & 18 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Archethic.BeaconChain do
alias __MODULE__.SummaryAggregate
alias __MODULE__.SummaryTimer
alias __MODULE__.Update
alias __MODULE__.ReplicationAttestation

alias Archethic.Crypto

Expand All @@ -26,6 +27,8 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.GetCurrentSummaries
alias Archethic.P2P.Message.GetCurrentReplicationAttestations
alias Archethic.P2P.Message.CurrentReplicationAttestations
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionSummaryList
Expand Down Expand Up @@ -207,6 +210,22 @@ defmodule Archethic.BeaconChain do
|> Enum.to_list()
end

@doc """
Returns the current summary's replication attestations that current node have
"""
@spec get_current_summary_replication_attestations(subset :: binary()) ::
Enumerable.t() | list(ReplicationAttestation.t())
def get_current_summary_replication_attestations(subset) do
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)

SummaryCache.stream_current_slots(subset)
|> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
end)
|> Stream.concat(replication_attestations)
|> ReplicationAttestation.reduce_confirmations()
end

@doc """
Return the previous summary datetimes from a given date
"""
Expand Down Expand Up @@ -339,12 +358,47 @@ defmodule Archethic.BeaconChain do
"""
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(date = %DateTime{} \\ DateTime.utc_now()) do
next_summary_date = next_summary_date(DateTime.truncate(date, :millisecond))
def list_transactions_summaries_from_current_slot(datetime = %DateTime{} \\ DateTime.utc_now()) do
get_next_summary_elected_subsets_by_nodes(datetime)
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summaries(node, subsets)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, summaries} -> summaries end)

# remove duplicates & sort
|> Stream.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

@doc """
Return the replications attestation that were gathered since last
Unordered
"""
@spec fetch_current_summary_replication_attestations(datetime :: DateTime.t()) ::
Enumerable.t() | list(ReplicationAttestation.t())
def fetch_current_summary_replication_attestations(datetime = %DateTime{} \\ DateTime.utc_now()) do
get_next_summary_elected_subsets_by_nodes(datetime)
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summary_replication_attestations_from_node(node, subsets)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, replication_attestations} -> replication_attestations end)
|> ReplicationAttestation.reduce_confirmations()
end

defp get_next_summary_elected_subsets_by_nodes(datetime) do
next_summary_date = next_summary_date(DateTime.truncate(datetime, :millisecond))
authorized_nodes = P2P.authorized_and_available_nodes(next_summary_date, true)

# get the subsets to request per node
list_subsets()
|> Enum.map(fn subset ->
subset
Expand All @@ -359,21 +413,6 @@ defmodule Archethic.BeaconChain do
Map.update(acc1, node, [subset], &[subset | &1])
end)
end)

# download the summaries
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summaries(node, subsets)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, summaries} -> summaries end)

# remove duplicates & sort
|> Stream.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
Expand Down Expand Up @@ -403,6 +442,19 @@ defmodule Archethic.BeaconChain do
|> Enum.to_list()
end

defp fetch_current_summary_replication_attestations_from_node(node, subsets) do
case P2P.send_message(node, %GetCurrentReplicationAttestations{subsets: subsets}) do
{:ok,
%CurrentReplicationAttestations{
replication_attestations: replication_attestations
}} ->
replication_attestations

_ ->
[]
end
end

defp fetch_beacon_summaries(node, addresses) do
start_time = System.monotonic_time()

Expand Down
8 changes: 5 additions & 3 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Archethic.Bootstrap do
alias Archethic.Replication

alias Archethic.SelfRepair
alias Archethic.SelfRepair.NetworkChain

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
Expand Down Expand Up @@ -247,8 +246,11 @@ defmodule Archethic.Bootstrap do

Archethic.Bootstrap.NetworkConstraints.persist_genesis_address()

Logger.info("Enforced Resync: Started!")
NetworkChain.synchronous_resync_many([:node, :oracle, :origin, :node_shared_secrets])
if P2P.authorized_and_available_node?() do
Logger.info("Current summary synchronization started")
count = SelfRepair.synchronize_current_summary()
Logger.info("Current summary synchronization finished: #{count} synchronized")
end

Sync.publish_end_of_sync()
SelfRepair.start_scheduler()
Expand Down
4 changes: 4 additions & 0 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Archethic.P2P.Message do
GetBeaconSummariesAggregate,
GetBootstrappingNodes,
GetCurrentSummaries,
GetCurrentReplicationAttestations,
CurrentReplicationAttestations,
GetLastTransaction,
GetLastTransactionAddress,
GetNextAddresses,
Expand Down Expand Up @@ -118,6 +120,7 @@ defmodule Archethic.P2P.Message do
| GetGenesisAddress.t()
| ValidationError.t()
| GetCurrentSummaries.t()
| GetCurrentReplicationAttestations.t()
| GetBeaconSummariesAggregate.t()
| NotifyPreviousChain.t()
| ShardRepair.t()
Expand Down Expand Up @@ -158,6 +161,7 @@ defmodule Archethic.P2P.Message do
| NetworkStats.t()
| SmartContractCallValidation.t()
| DashboardData.t()
| CurrentReplicationAttestations.t()

@floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed])
@content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size)
Expand Down
52 changes: 52 additions & 0 deletions lib/archethic/p2p/message/current_replication_attestations.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Archethic.P2P.Message.CurrentReplicationAttestations do
@moduledoc """
The response message of GetCurrentReplicationAttestations
"""

@enforce_keys [:replication_attestations]
defstruct [:replication_attestations]

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.Utils.VarInt

@type t() :: %__MODULE__{
replication_attestations: list(ReplicationAttestation.t())
}

@spec serialize(message :: t()) :: bitstring()
def serialize(%__MODULE__{replication_attestations: replication_attestations}) do
replication_attestations_bin =
replication_attestations
|> Enum.map(&ReplicationAttestation.serialize/1)
|> :erlang.list_to_bitstring()

encoded_replication_attestations_len = length(replication_attestations) |> VarInt.from_value()

<<encoded_replication_attestations_len::binary, replication_attestations_bin::bitstring>>
end

@spec deserialize(bin :: bitstring()) :: {t(), bitstring()}
def deserialize(<<rest::bitstring>>) do
{count, rest} = rest |> VarInt.get_value()

{replication_attestations, <<rest::bitstring>>} = deserialize_list(rest, count, [])

{
%__MODULE__{
replication_attestations: replication_attestations
},
rest
}
end

defp deserialize_list(rest, 0, _acc), do: {[], rest}

defp deserialize_list(rest, count, acc) when length(acc) == count do
{Enum.reverse(acc), rest}
end

defp deserialize_list(rest, count, acc) do
{replication_attestation, rest} = ReplicationAttestation.deserialize(rest)
deserialize_list(rest, count, [replication_attestation | acc])
end
end
46 changes: 46 additions & 0 deletions lib/archethic/p2p/message/get_current_replication_attestations.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Archethic.P2P.Message.GetCurrentReplicationAttestations do
@moduledoc """
A request to get the replication attestations of current summary
The nodes receiving this must be elected to store the given subsets
"""

@enforce_keys [:subsets]
defstruct [:subsets]

alias Archethic.BeaconChain
alias Archethic.Crypto
alias Archethic.P2P.Message.CurrentReplicationAttestations

@type t :: %__MODULE__{subsets: list(binary())}

@spec process(message :: __MODULE__.t(), sender_public_key :: Crypto.key()) ::
CurrentReplicationAttestations.t()
def process(%__MODULE__{subsets: subsets}, _) do
%CurrentReplicationAttestations{
replication_attestations:
subsets
|> Stream.flat_map(&BeaconChain.get_current_summary_replication_attestations/1)
|> Enum.to_list()
}
end

@spec serialize(message :: t()) :: bitstring()
def serialize(%__MODULE__{subsets: subsets}) do
subsets_bin = :erlang.list_to_binary(subsets)

<<length(subsets)::16, subsets_bin::binary>>
end

@spec deserialize(bin :: bitstring()) :: {t(), bitstring()}
def deserialize(<<nb_subsets::16, rest::bitstring>>) do
{subsets, <<rest::bitstring>>} = deserialize_subsets(rest, nb_subsets, [])

{%__MODULE__{subsets: subsets}, rest}
end

defp deserialize_subsets(rest, 0, acc), do: {Enum.reverse(acc), rest}

defp deserialize_subsets(<<subset::8, rest::bitstring>>, n, acc) do
deserialize_subsets(rest, n - 1, [<<subset>> | acc])
end
end
4 changes: 2 additions & 2 deletions lib/archethic/p2p/message/get_current_summaries.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ defmodule Archethic.P2P.Message.GetCurrentSummaries do
@spec serialize(t()) :: bitstring()
def serialize(%__MODULE__{subsets: subsets}) do
subsets_bin = :erlang.list_to_binary(subsets)
<<length(subsets)::8, subsets_bin::binary>>
<<length(subsets)::16, subsets_bin::binary>>
end

@spec deserialize(bitstring()) :: {t(), bitstring}
def deserialize(<<nb_subsets::8, rest::binary>>) do
def deserialize(<<nb_subsets::16, rest::binary>>) do
subsets_bin = :binary.part(rest, 0, nb_subsets)
subsets = for <<subset::8 <- subsets_bin>>, do: <<subset>>
{%__MODULE__{subsets: subsets}, <<>>}
Expand Down
5 changes: 4 additions & 1 deletion lib/archethic/p2p/message/message_id.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ defmodule Archethic.P2P.MessageId do
RegisterBeaconUpdates,
GetGenesisAddress,
GetCurrentSummaries,
GetCurrentReplicationAttestations,
CurrentReplicationAttestations,
GetBeaconSummariesAggregate,
NotifyPreviousChain,
GetNextAddresses,
Expand Down Expand Up @@ -125,6 +127,7 @@ defmodule Archethic.P2P.MessageId do
GetNetworkStats => 39,
GetDashboardData => 40,
UnlockChain => 41,
GetCurrentReplicationAttestations => 42,

# Responses
DashboardData => 225,
Expand All @@ -145,7 +148,7 @@ defmodule Archethic.P2P.MessageId do
Summary => 240,
LastTransactionAddress => 241,
FirstPublicKey => 242,
# Message number 243 is available
CurrentReplicationAttestations => 243,
TransactionInputList => 244,
TransactionChainLength => 245,
BootstrappingNodes => 246,
Expand Down
10 changes: 10 additions & 0 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,14 @@ defmodule Archethic.SelfRepair do
"""
@spec next_repair_time() :: DateTime.t()
defdelegate next_repair_time, to: Scheduler

@doc """
Synchronously synchronize all the transactions that happened since previous summary aggregate
"""
@spec synchronize_current_summary() :: integer()
def synchronize_current_summary() do
BeaconChain.fetch_current_summary_replication_attestations()
|> Enum.to_list()
|> Sync.process_replication_attestations(P2P.authorized_and_available_nodes())
end
end
32 changes: 22 additions & 10 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,12 @@ defmodule Archethic.SelfRepair.Sync do
) do
start_time = System.monotonic_time()

nodes_including_self = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes()

attestations_to_sync =
attestations
|> adjust_attestations(download_nodes)
|> Stream.filter(&TransactionHandler.download_transaction?(&1, nodes_including_self))
|> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime})

synchronize_transactions(attestations_to_sync, download_nodes)
nb_transactions = process_replication_attestations(attestations, download_nodes)

:telemetry.execute(
[:archethic, :self_repair, :process_aggregate],
%{duration: System.monotonic_time() - start_time},
%{nb_transactions: length(attestations_to_sync)}
%{nb_transactions: nb_transactions}
)

availability_update = DateTime.add(summary_time, availability_adding_time)
Expand Down Expand Up @@ -353,6 +345,26 @@ defmodule Archethic.SelfRepair.Sync do
store_last_sync_date(summary_time)
end

@doc """
Downloads and stores the transactions missed, returns the count of transactions synchronized
"""
@spec process_replication_attestations(
replication_attestations :: list(ReplicationAttestation.t()),
download_nodes :: list(Node.t())
) :: integer()
def process_replication_attestations(replication_attestations, download_nodes) do
nodes_including_self = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes()

replication_attestations
|> adjust_attestations(download_nodes)
|> Stream.filter(&TransactionHandler.download_transaction?(&1, nodes_including_self))
|> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime})
|> then(fn filtered_attestations ->
synchronize_transactions(filtered_attestations, download_nodes)
length(filtered_attestations)
end)
end

# To avoid beacon chain database migration we have to support both summaries with genesis address and without
# Hence, we need to adjust or revised the attestation to include the genesis addresses
# which is not present in the version 1 of transaction's summary.
Expand Down
Loading
Loading