diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 8e7befba3..61ef1f383 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -23,7 +23,9 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P alias Archethic.P2P.Node alias Archethic.P2P.Message.GetBeaconSummaries + alias Archethic.P2P.Message.GetBeaconSummariesAggregate alias Archethic.P2P.Message.BeaconSummaryList + alias Archethic.P2P.Message.NotFound alias Archethic.TaskSupervisor @@ -274,10 +276,6 @@ defmodule Archethic.BeaconChain do [ ] [ ] [ ] Aggregate addresses | | | [ ] [ ] [ ] Fetch summaries - |\ /|\ /| - | \/ | \/ | - | /\ | /\ | - [D1] [D2] [D3] Partition by date | | | [ ] [ ] [ ] Aggregate and consolidate summaries \ | / @@ -287,18 +285,17 @@ defmodule Archethic.BeaconChain do [ ] ``` """ - @spec fetch_summary_aggregates(list(DateTime.t()) | Enumerable.t()) :: - list(SummaryAggregate.t()) - def fetch_summary_aggregates(dates) do - authorized_nodes = P2P.authorized_and_available_nodes() + @spec fetch_and_aggregate_summaries(DateTime.t()) :: SummaryAggregate.t() + def fetch_and_aggregate_summaries(date = %DateTime{}) do + authorized_nodes = + P2P.authorized_and_available_nodes() + |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) list_subsets() |> Flow.from_enumerable(stages: 256) |> Flow.flat_map(fn subset -> - # Foreach subset and date we compute concurrently the node election - dates - |> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes)) - |> Enum.flat_map(& &1) + # Foreach subset we compute concurrently the node election + get_summary_address_by_node(date, subset, authorized_nodes) end) # We partition by node |> Flow.partition(key: {:elem, 0}) @@ -310,23 +307,16 @@ defmodule Archethic.BeaconChain do # For this node we fetch the summaries fetch_summaries(node, addresses) end) - # We repartition by summary time to aggregate summaries for a date - |> Flow.partition(stages: System.schedulers_online() * 4, key: {:key, :summary_time}) - |> Flow.reduce( - fn -> %{} end, - fn summary = %Summary{summary_time: time}, acc -> - Map.update( - acc, - time, - %SummaryAggregate{summary_time: time} |> SummaryAggregate.add_summary(summary), - &SummaryAggregate.add_summary(&1, summary) - ) - end + # We departition to build the final summarie aggregate + |> Flow.departition( + fn -> %SummaryAggregate{summary_time: date} end, + fn summaries, acc -> + Enum.reduce(summaries, acc, &SummaryAggregate.add_summary(&2, &1)) + end, + & &1 ) - |> Flow.on_trigger(&{Map.values(&1), &1}) - |> Stream.reject(&SummaryAggregate.empty?/1) - |> Stream.map(&SummaryAggregate.aggregate/1) - |> Enum.sort_by(& &1.summary_time, {:asc, DateTime}) + |> Enum.to_list() + |> Enum.at(0) end defp get_summary_address_by_node(date, subset, authorized_nodes) do @@ -373,4 +363,55 @@ defmodule Archethic.BeaconChain do ) end) end + + @doc """ + Get a beacon summaries aggregate for a given date + """ + @spec get_summaries_aggregate(DateTime.t()) :: + {:ok, SummaryAggregate.t()} | {:error, :not_exists} + defdelegate get_summaries_aggregate(datetime), to: DB, as: :get_beacon_summaries_aggregate + + @doc """ + Persists a beacon summaries aggregate + """ + @spec write_summaries_aggregate(SummaryAggregate.t()) :: :ok + defdelegate write_summaries_aggregate(aggregate), to: DB, as: :write_beacon_summaries_aggregate + + @doc """ + Fetch a summaries aggregate for a given date + """ + @spec fetch_summaries_aggregate(DateTime.t()) :: + {:ok, SummaryAggregate.t()} | {:error, :not_exists} | {:error, :network_issue} + def fetch_summaries_aggregate(summary_time = %DateTime{}) do + storage_nodes = + summary_time + |> Crypto.derive_beacon_aggregate_address() + |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) + + conflict_resolver = fn results -> + # Prioritize results over not found + with nil <- Enum.find(results, &match?(%SummaryAggregate{}, &1)), + nil <- Enum.find(results, &match?(%NotFound{}, &1)) do + %NotFound{} + else + res -> + res + end + end + + case P2P.quorum_read( + storage_nodes, + %GetBeaconSummariesAggregate{date: summary_time}, + conflict_resolver + ) do + {:ok, aggregate = %SummaryAggregate{}} -> + {:ok, aggregate} + + {:ok, %NotFound{}} -> + {:error, :not_exists} + + {:error, :network_issue} = e -> + e + end + end end diff --git a/lib/archethic/beacon_chain/slot_timer.ex b/lib/archethic/beacon_chain/slot_timer.ex index dd0c6e063..d1d5ad9a7 100644 --- a/lib/archethic/beacon_chain/slot_timer.ex +++ b/lib/archethic/beacon_chain/slot_timer.ex @@ -5,21 +5,23 @@ defmodule Archethic.BeaconChain.SlotTimer do use GenServer - alias Crontab.CronExpression.Parser, as: CronParser - alias Crontab.Scheduler, as: CronScheduler - alias Archethic.BeaconChain alias Archethic.BeaconChain.SubsetRegistry + alias Archethic.BeaconChain.SummaryTimer - alias Archethic.P2P - alias Archethic.P2P.Node + alias Archethic.DB alias Archethic.Crypto + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.PubSub alias Archethic.Utils + alias Crontab.CronExpression.Parser, as: CronParser + alias Crontab.Scheduler, as: CronScheduler + require Logger @slot_timer_ets :archethic_slot_timer @@ -136,6 +138,12 @@ defmodule Archethic.BeaconChain.SlotTimer do PubSub.notify_current_epoch_of_slot_timer(slot_time) + if SummaryTimer.match_interval?(slot_time) do + # We clean the previously stored summaries - The retention time is for a self repair cycle + # as the aggregates will be handle for long term storage. + DB.clear_beacon_summaries() + end + case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do %Node{authorized?: true, available?: true} -> Logger.debug("Trigger beacon slots creation at #{Utils.time_to_string(slot_time)}") diff --git a/lib/archethic/beacon_chain/summary_aggregate.ex b/lib/archethic/beacon_chain/summary_aggregate.ex index da8065dbb..37cab0175 100644 --- a/lib/archethic/beacon_chain/summary_aggregate.ex +++ b/lib/archethic/beacon_chain/summary_aggregate.ex @@ -5,7 +5,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do This will help the self-sepair to maintain an aggregated and ordered view of items to synchronize and to resolve """ - defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}] + defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}, version: 1] alias Archethic.Crypto @@ -17,6 +17,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do alias Archethic.Utils @type t :: %__MODULE__{ + version: non_neg_integer(), summary_time: DateTime.t(), transaction_summaries: list(TransactionSummary.t()), p2p_availabilities: %{ @@ -31,62 +32,71 @@ defmodule Archethic.BeaconChain.SummaryAggregate do @doc """ Aggregate a new BeaconChain's summary """ - @spec add_summary(t(), BeaconSummary.t()) :: t() + @spec add_summary(t(), BeaconSummary.t(), check_attestation :: boolean()) :: t() def add_summary( agg = %__MODULE__{}, %BeaconSummary{ subset: subset, - summary_time: summary_time, transaction_attestations: transaction_attestations, node_availabilities: node_availabilities, node_average_availabilities: node_average_availabilities, end_of_node_synchronizations: end_of_node_synchronizations - } + }, + check_attestation? \\ true ) do valid_attestations? = - Enum.all?(transaction_attestations, fn attestation -> - ReplicationAttestation.validate(attestation) == :ok - end) + if check_attestation? do + Enum.all?(transaction_attestations, fn attestation -> + ReplicationAttestation.validate(attestation) == :ok + end) + else + true + end if valid_attestations? do - agg - |> Map.update!( - :transaction_summaries, - fn prev -> - transaction_attestations - |> Enum.map(& &1.transaction_summary) - |> Enum.concat(prev) - end - ) - |> update_in( - [ - Access.key(:p2p_availabilities, %{}), - Access.key(subset, %{ - node_availabilities: [], - node_average_availabilities: [], - end_of_node_synchronizations: [] - }) - ], - fn prev -> - prev - |> Map.update!( - :node_availabilities, - &Enum.concat(&1, [Utils.bitstring_to_integer_list(node_availabilities)]) - ) - |> Map.update!( - :node_average_availabilities, - &Enum.concat(&1, [node_average_availabilities]) - ) - |> Map.update!( - :end_of_node_synchronizations, - &Enum.concat(&1, end_of_node_synchronizations) - ) - end - ) - |> Map.update(:summary_time, summary_time, fn - nil -> summary_time - prev -> prev - end) + agg = + Map.update!( + agg, + :transaction_summaries, + fn prev -> + transaction_attestations + |> Enum.map(& &1.transaction_summary) + |> Enum.concat(prev) + |> Enum.uniq_by(& &1.address) + end + ) + + if bit_size(node_availabilities) > 0 or length(node_average_availabilities) > 0 or + length(end_of_node_synchronizations) > 0 do + update_in( + agg, + [ + Access.key(:p2p_availabilities, %{}), + Access.key(subset, %{ + node_availabilities: [], + node_average_availabilities: [], + end_of_node_synchronizations: [] + }) + ], + fn prev -> + prev + |> Map.update!( + :node_availabilities, + &Enum.concat(&1, [Utils.bitstring_to_integer_list(node_availabilities)]) + ) + |> Map.update!( + :node_average_availabilities, + &Enum.concat(&1, [node_average_availabilities]) + ) + |> Map.update!( + :end_of_node_synchronizations, + &Enum.concat(&1, end_of_node_synchronizations) + ) + end + ) + else + agg + end else agg end @@ -155,4 +165,205 @@ defmodule Archethic.BeaconChain.SummaryAggregate do do: true def empty?(%__MODULE__{}), do: false + + @doc """ + Serialize beacon summaries aggregate + + ## Examples + + iex> %SummaryAggregate{ + ...> summary_time: ~U[2022-03-01 00:00:00Z], + ...> transaction_summaries: [ + ...> %TransactionSummary{ + ...> address: <<0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, + ...> 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116>>, + ...> type: :transfer, + ...> timestamp: ~U[2022-02-01 10:00:00.204Z], + ...> fee: 10_000_000 + ...> } + ...> ], + ...> p2p_availabilities: %{ + ...> <<0>> => %{ + ...> node_availabilities: <<1::1, 0::1, 1::1>>, + ...> node_average_availabilities: [0.5, 0.7, 0.8], + ...> end_of_node_synchronizations: [ + ...> <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + ...> 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> + ...> ] + ...> } + ...> } + ...> } |> SummaryAggregate.serialize() + << + # Version + 1, + # Summary time + 98, 29, 98, 0, + # Nb transaction summaries + 1, 1, + # Address + 0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, + 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116, + # Timestamp + 0, 0, 1, 126, 180, 186, 17, 204, + # Type + 253, + # Fee, + 0, 0, 0, 0, 0, 152, 150, 128, + # Nb movements addresses + 1, 0, + # Nb of p2p availabilities subset + 1, + # Subset + 0, + # Nb of node availabilities + 1, 3, + # Nodes availabilities + 1::1, 0::1, 1::1, + # Nodes average availabilities + 50, 70, 80, + # Nb of end of node synchronizations + 1, 1, + # End of node synchronization + 0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242 + >> + """ + @spec serialize(t()) :: bitstring() + def serialize(%__MODULE__{ + version: version, + summary_time: summary_time, + transaction_summaries: transaction_summaries, + p2p_availabilities: p2p_availabilities + }) do + nb_tx_summaries = Utils.VarInt.from_value(length(transaction_summaries)) + + tx_summaries_bin = + transaction_summaries + |> Enum.map(&TransactionSummary.serialize/1) + |> :erlang.list_to_binary() + + p2p_availabilities_bin = + p2p_availabilities + |> Enum.map(fn {subset, + %{ + node_availabilities: node_availabilities, + node_average_availabilities: node_avg_availabilities, + end_of_node_synchronizations: end_of_sync + }} -> + nb_node_availabilities = Utils.VarInt.from_value(bit_size(node_availabilities)) + + node_avg_availabilities_bin = + node_avg_availabilities + |> Enum.map(fn avg -> trunc(avg * 100) end) + |> :erlang.list_to_binary() + + nb_end_of_sync = Utils.VarInt.from_value(length(end_of_sync)) + + end_of_sync_bin = :erlang.list_to_binary(end_of_sync) + + <> + end) + |> :erlang.list_to_bitstring() + + <> + end + + @doc """ + Deserialize beacon summaries aggregate + + ## Examples + + iex> SummaryAggregate.deserialize(<<1, 98, 29, 98, 0, 1, 1, 0, 0, 120, 123, 229, 13, 144, 130, 230, + ...> 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, + ...> 194, 30, 71, 116, 0, 0, 1, 126, 180, 186, 17, 204, 253, 0, 0, 0, 0, 0, 152, 150, 128, 1, 0, 1, + ...> 0, 1, 3, 1::1, 0::1, 1::1, 50, 70, 80, 1, 1, 0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, + ...> 109, 252, 111, 87, 231, 170, 54, 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>>) + { + %SummaryAggregate{ + summary_time: ~U[2022-03-01 00:00:00Z], + transaction_summaries: [ + %TransactionSummary{ + address: <<0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, + 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116>>, + type: :transfer, + timestamp: ~U[2022-02-01 10:00:00.204Z], + fee: 10_000_000 + } + ], + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<1::1, 0::1, 1::1>>, + node_average_availabilities: [0.5, 0.7, 0.8], + end_of_node_synchronizations: [ + <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> + ] + } + } + }, + "" + } + """ + @spec deserialize(bitstring()) :: {t(), bitstring()} + def deserialize(<<1::8, timestamp::32, rest::bitstring>>) do + {nb_tx_summaries, rest} = Utils.VarInt.get_value(rest) + + {tx_summaries, <>} = + Utils.deserialize_transaction_summaries(rest, nb_tx_summaries, []) + + {p2p_availabilities, rest} = deserialize_p2p_availabilities(rest, nb_p2p_availabilities, %{}) + + { + %__MODULE__{ + version: 1, + summary_time: DateTime.from_unix!(timestamp), + transaction_summaries: tx_summaries, + p2p_availabilities: p2p_availabilities + }, + rest + } + end + + defp deserialize_p2p_availabilities(<<>>, _, acc), do: {acc, <<>>} + + defp deserialize_p2p_availabilities(rest, nb_p2p_availabilities, acc) + when map_size(acc) == nb_p2p_availabilities do + {acc, rest} + end + + defp deserialize_p2p_availabilities( + <>, + nb_p2p_availabilities, + acc + ) do + {nb_node_availabilities, rest} = Utils.VarInt.get_value(rest) + + <> = rest + + node_avg_availabilities = + node_avg_availabilities_bin + |> :erlang.binary_to_list() + |> Enum.map(fn avg -> avg / 100 end) + + {nb_end_of_sync, rest} = Utils.VarInt.get_value(rest) + {end_of_node_sync, rest} = Utils.deserialize_public_key_list(rest, nb_end_of_sync, []) + + deserialize_p2p_availabilities( + rest, + nb_p2p_availabilities, + Map.put( + acc, + subset, + %{ + node_availabilities: node_availabilities, + node_average_availabilities: node_avg_availabilities, + end_of_node_synchronizations: end_of_node_sync + } + ) + ) + end end diff --git a/lib/archethic/crypto.ex b/lib/archethic/crypto.ex index 338a79b61..d8c3a4390 100755 --- a/lib/archethic/crypto.ex +++ b/lib/archethic/crypto.ex @@ -218,6 +218,17 @@ defmodule Archethic.Crypto do |> derive_address() end + @doc """ + Derive a beacon aggregate address based on the date + """ + @spec derive_beacon_aggregate_address(DateTime.t()) :: versioned_hash() + def derive_beacon_aggregate_address(date = %DateTime{}) do + storage_nonce() + |> derive_keypair(hash(["beacon_aggregate", date |> DateTime.to_unix() |> to_string()])) + |> elem(0) + |> derive_address() + end + @doc """ Store the encrypted secrets in the keystore by decrypting them with the given secret key """ diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index f2bcd92ee..a4381cb52 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -1,11 +1,14 @@ defmodule Archethic.DB do @moduledoc false + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.Crypto alias __MODULE__.EmbeddedImpl + alias Archethic.TransactionChain.Transaction - alias Archethic.BeaconChain.Summary use Knigge, otp_app: :archethic, default: EmbeddedImpl @@ -13,6 +16,8 @@ defmodule Archethic.DB do {:ok, Transaction.t()} | {:error, :transaction_not_exists} @callback get_beacon_summary(summary_address :: binary()) :: {:ok, Summary.t()} | {:error, :summary_not_exists} + @callback get_beacon_summaries_aggregate(DateTime.t()) :: + {:ok, SummaryAggregate.t()} | {:error, :not_exists} @callback get_transaction_chain( binary(), fields :: list(), @@ -20,8 +25,9 @@ defmodule Archethic.DB do ) :: Enumerable.t() @callback write_transaction(Transaction.t()) :: :ok @callback write_beacon_summary(Summary.t()) :: :ok + @callback clear_beacon_summaries() :: :ok + @callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok @callback write_transaction_chain(Enumerable.t()) :: :ok - # @callback write_transaction(Transaction.t()) :: :ok @callback list_transactions(fields :: list()) :: Enumerable.t() @callback add_last_transaction_address(binary(), binary(), DateTime.t()) :: :ok @callback list_last_transaction_addresses() :: Enumerable.t() diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index 24098ef30..19c1a80ee 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -4,6 +4,9 @@ defmodule Archethic.DB.EmbeddedImpl do while using a key value in memory for fast lookup """ + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.Crypto alias __MODULE__.BootstrapInfo @@ -15,8 +18,6 @@ defmodule Archethic.DB.EmbeddedImpl do alias Archethic.TransactionChain.Transaction - alias Archethic.BeaconChain.Summary - alias Archethic.Utils defdelegate child_spec(opts), to: __MODULE__.Supervisor @@ -110,6 +111,26 @@ defmodule Archethic.DB.EmbeddedImpl do ChainWriter.write_beacon_summary(summary, db_path()) end + @doc """ + Remove the beacon summaries files + """ + @spec clear_beacon_summaries() :: :ok + def clear_beacon_summaries do + db_path() + |> ChainWriter.base_beacon_path() + |> Path.join("*") + |> Path.wildcard() + |> Enum.each(&File.rm!/1) + end + + @doc """ + Write a beacon summaries aggregate + """ + @spec write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok + def write_beacon_summaries_aggregate(aggregate = %SummaryAggregate{}) do + ChainWriter.write_beacon_summaries_aggregate(aggregate, db_path()) + end + @doc """ Determine if the transaction exists or not """ @@ -136,6 +157,15 @@ defmodule Archethic.DB.EmbeddedImpl do ChainReader.get_beacon_summary(summary_address, db_path()) end + @doc """ + Get a beacon summaries aggregate at a given date + """ + @spec get_beacon_summaries_aggregate(DateTime.t()) :: + {:ok, SummaryAggregate.t()} | {:error, :not_exists} + def get_beacon_summaries_aggregate(date = %DateTime{}) do + ChainReader.get_beacon_summaries_aggregate(date, db_path()) + end + @doc """ Get a transaction chain diff --git a/lib/archethic/db/embedded_impl/chain_reader.ex b/lib/archethic/db/embedded_impl/chain_reader.ex index 49a934e16..3cb0daa88 100644 --- a/lib/archethic/db/embedded_impl/chain_reader.ex +++ b/lib/archethic/db/embedded_impl/chain_reader.ex @@ -1,6 +1,9 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do @moduledoc false + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.DB.EmbeddedImpl.ChainIndex alias Archethic.DB.EmbeddedImpl.ChainWriter alias Archethic.DB.EmbeddedImpl.Encoding @@ -8,8 +11,6 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do alias Archethic.TransactionChain.Transaction alias Archethic.Utils - alias Archethic.BeaconChain.Summary - @page_size 10 @spec get_transaction(address :: binary(), fields :: list(), db_path :: String.t()) :: @@ -50,23 +51,49 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do end end + @doc """ + Get a beacon summary from a given summary address + """ @spec get_beacon_summary(summary_address :: binary(), db_path :: String.t()) :: {:ok, Summary.t()} | {:error, :summary_not_exists} def get_beacon_summary(summary_address, db_path) do start = System.monotonic_time() - filepath = ChainWriter.beacon_path(db_path, summary_address) - if File.exists?(filepath) do - {summary, _rest} = File.read!(filepath) |> Summary.deserialize() - + with true <- File.exists?(filepath), + {:ok, data} <- File.read(filepath), + {summary, _} <- Summary.deserialize(data) do :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ query: "get_beacon_summary" }) {:ok, summary} else - {:error, :summary_not_exists} + _ -> + {:error, :summary_not_exists} + end + end + + @doc """ + Get a beacon summaries aggregate from a given date + """ + @spec get_beacon_summaries_aggregate(summary_time :: DateTime.t(), db_path :: String.t()) :: + {:ok, SummaryAggregate.t()} | {:error, :not_exists} + def get_beacon_summaries_aggregate(date = %DateTime{}, db_path) when is_binary(db_path) do + start = System.monotonic_time() + filepath = ChainWriter.beacon_aggregate_path(db_path, date) + + with true <- File.exists?(filepath), + {:ok, data} <- File.read(filepath), + {aggregate, _} <- SummaryAggregate.deserialize(data) do + :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ + query: "get_beacon_summaries_aggregate" + }) + + {:ok, aggregate} + else + _ -> + {:error, :not_exists} end end diff --git a/lib/archethic/db/embedded_impl/chain_writer.ex b/lib/archethic/db/embedded_impl/chain_writer.ex index 6f19aa5a5..28493f0f0 100644 --- a/lib/archethic/db/embedded_impl/chain_writer.ex +++ b/lib/archethic/db/embedded_impl/chain_writer.ex @@ -3,18 +3,19 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do use GenServer + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.BeaconChain.Summary + + alias Archethic.Crypto + alias Archethic.DB.EmbeddedImpl.Encoding alias Archethic.DB.EmbeddedImpl.ChainIndex alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp - alias Archethic.BeaconChain.Summary - alias Archethic.Utils - alias Archethic.Crypto - def start_link(arg \\ [], opts \\ []) do GenServer.start_link(__MODULE__, arg, opts) end @@ -56,6 +57,31 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do }) end + @doc """ + Write a beacon summaries aggregate in a new file + """ + @spec write_beacon_summaries_aggregate(SummaryAggregate.t(), String.t()) :: :ok + def write_beacon_summaries_aggregate( + aggregate = %SummaryAggregate{summary_time: summary_time}, + db_path + ) + when is_binary(db_path) do + start = System.monotonic_time() + + filename = beacon_aggregate_path(db_path, summary_time) + + data = + aggregate + |> SummaryAggregate.serialize() + |> Utils.wrap_binary() + + File.write!(filename, data, [:exclusive, :binary]) + + :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ + query: "write_beacon_summaries_aggregate" + }) + end + def init(arg) do db_path = Keyword.get(arg, :path) partition = Keyword.get(arg, :partition) @@ -75,6 +101,10 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do path |> base_beacon_path() |> File.mkdir_p!() + + path + |> base_beacon_aggregate_path() + |> File.mkdir_p!() end def handle_call( @@ -152,7 +182,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do end @doc """ - Return the path of the sbeacon ummary storage location + Return the path of the beacon summary storage location """ @spec beacon_path(String.t(), binary()) :: String.t() def beacon_path(db_path, summary_address) @@ -160,6 +190,14 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do Path.join([base_beacon_path(db_path), Base.encode16(summary_address)]) end + @doc """ + Return the path of the beacon summary aggregate storage location + """ + @spec beacon_aggregate_path(String.t(), DateTime.t()) :: String.t() + def beacon_aggregate_path(db_path, date = %DateTime{}) when is_binary(db_path) do + Path.join([base_beacon_aggregate_path(db_path), date |> DateTime.to_unix() |> to_string()]) + end + @doc """ Return the beacon summary base path """ @@ -167,4 +205,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do def base_beacon_path(db_path) do Path.join([db_path, "beacon_summary"]) end + + @doc """ + Return the beacon summaries aggregate base path + """ + @spec base_beacon_aggregate_path(String.t()) :: String.t() + def base_beacon_aggregate_path(db_path) do + Path.join([db_path, "beacon_aggregate"]) + end end diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index a2d08c4de..283b486a9 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -7,6 +7,7 @@ defmodule Archethic.P2P.Message do alias Archethic.BeaconChain alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Subset alias Archethic.BeaconChain.Slot @@ -35,6 +36,7 @@ defmodule Archethic.P2P.Message do alias __MODULE__.GetBalance alias __MODULE__.GetBeaconSummaries alias __MODULE__.GetBeaconSummary + alias __MODULE__.GetBeaconSummariesAggregate alias __MODULE__.GetBootstrappingNodes alias __MODULE__.GetCurrentSummaries alias __MODULE__.GetFirstPublicKey @@ -131,6 +133,7 @@ defmodule Archethic.P2P.Message do | GetFirstAddress.t() | ValidationError.t() | GetCurrentSummaries.t() + | GetBeaconSummariesAggregate.t() @type response :: Ok.t() @@ -154,6 +157,7 @@ defmodule Archethic.P2P.Message do | BeaconSummaryList.t() | FirstAddress.t() | ReplicationError.t() + | SummaryAggregate.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) @@ -418,6 +422,14 @@ defmodule Archethic.P2P.Message do <<32::8, length(subsets)::8, subsets_bin::binary>> end + def encode(%GetBeaconSummariesAggregate{date: date}) do + <<33::8, DateTime.to_unix(date)::32>> + end + + def encode(aggregate = %SummaryAggregate{}) do + <<231::8, SummaryAggregate.serialize(aggregate)::bitstring>> + end + def encode(%TransactionSummaryList{transaction_summaries: transaction_summaries}) do transaction_summaries_bin = transaction_summaries @@ -940,6 +952,14 @@ defmodule Archethic.P2P.Message do {%GetCurrentSummaries{subsets: subsets}, <<>>} end + def decode(<<33::8, timestamp::32, rest::bitstring>>) do + {%GetBeaconSummariesAggregate{date: DateTime.from_unix!(timestamp)}, rest} + end + + def decode(<<231::8, rest::bitstring>>) do + SummaryAggregate.deserialize(rest) + end + def decode(<<232::8, rest::bitstring>>) do {nb_transaction_summaries, rest} = rest |> VarInt.get_value() @@ -1692,4 +1712,14 @@ defmodule Archethic.P2P.Message do %Error{reason: :invalid_attestation} end end + + def process(%GetBeaconSummariesAggregate{date: date}) do + case BeaconChain.get_summaries_aggregate(date) do + {:ok, aggregate} -> + aggregate + + {:error, :not_exists} -> + %NotFound{} + end + end end diff --git a/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex b/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex new file mode 100644 index 000000000..15ffdfcf8 --- /dev/null +++ b/lib/archethic/p2p/message/get_beacon_summaries_aggregate.ex @@ -0,0 +1,12 @@ +defmodule Archethic.P2P.Message.GetBeaconSummariesAggregate do + @moduledoc """ + Represents a message to get a beacon summary aggregate + """ + + @enforce_keys [:date] + defstruct [:date] + + @type t :: %__MODULE__{ + date: DateTime.t() + } +end diff --git a/lib/archethic/release_task.ex b/lib/archethic/release_task.ex deleted file mode 100644 index 9a3873e98..000000000 --- a/lib/archethic/release_task.ex +++ /dev/null @@ -1,91 +0,0 @@ -defmodule Archethic.ReleaseTask do - @moduledoc """ - Task using in the release to send initial funds to the addresses of the onchain - version of the website - """ - - alias Archethic.Crypto - - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.TransactionData - alias Archethic.TransactionChain.TransactionData.Ledger - alias Archethic.TransactionChain.TransactionData.UCOLedger - alias Archethic.TransactionChain.TransactionData.UCOLedger.Transfer - - # TODO: to remove once the Client UI developed - def transfer_to_website_addresses(amount \\ 1.0) do - seed = Base.decode16!("6CBF75F092278AA0751096CE85FE1E1F033FF50312B146DB336FAF861C8C4E09") - - Transaction.new( - :transfer, - %TransactionData{ - ledger: %Ledger{ - uco: %UCOLedger{ - transfers: - Enum.map(website_seeds(), fn destination_seed -> - {pub, _} = - Crypto.derive_keypair(destination_seed, get_last_index(destination_seed)) - - %Transfer{to: Crypto.derive_address(pub), amount: amount} - end) - } - } - }, - seed, - get_last_index(seed) - ) - |> Archethic.send_new_transaction() - end - - defp get_last_index(seed) do - address = - seed - |> Crypto.derive_keypair(0) - |> elem(0) - |> Crypto.derive_address() - - case Archethic.get_last_transaction(address) do - {:ok, %Transaction{address: address}} -> - Archethic.get_transaction_chain_length(address) - - _ -> - 0 - end - end - - defp website_seeds do - [ - Crypto.derive_address("animate_seed"), - Crypto.derive_address("bicon_seed"), - Crypto.derive_address("bootstrap_css_seed"), - Crypto.derive_address("bootstrap_js_seed"), - Crypto.derive_address("fontawesome_seed"), - Crypto.derive_address("carousel_seed"), - Crypto.derive_address("jquery_seed"), - Crypto.derive_address("magnificpopup_css_seed"), - Crypto.derive_address("archethic_css_seed"), - Crypto.derive_address("owlcarousel_css_seed"), - Crypto.derive_address("owlcarousel_js_seed"), - Crypto.derive_address("popper_seed"), - Crypto.derive_address("wow_seed"), - Crypto.derive_address("jquerycountdown_seed"), - Crypto.derive_address("magnificpopup_js_seed"), - Crypto.derive_address("particles_seed"), - Crypto.derive_address("archethic_js_seed"), - Crypto.derive_address("d3_seed"), - Crypto.derive_address("d3queue_seed"), - Crypto.derive_address("d3topojson_seed"), - Crypto.derive_address("archethic_biometricanim_seed"), - Crypto.derive_address("archethic_blockchainanim_seed"), - Crypto.derive_address("formvalidator_seed"), - Crypto.derive_address("world-110_seed"), - Crypto.derive_address("archethic_index_seed"), - Crypto.derive_address("archethic_index_fr_seed"), - Crypto.derive_address("archethic_index_ru_seed"), - Crypto.derive_address("archethic_whitepaper_seed"), - Crypto.derive_address("archethic_whitepaper_fr_seed"), - Crypto.derive_address("archethic_yellowpaper_s1_seed"), - Crypto.derive_address("archethic_yellowpaper_s1_fr_seed") - ] - end -end diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 158ea16e8..0dadea1c2 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -3,12 +3,15 @@ defmodule Archethic.SelfRepair.Sync do alias Archethic.BeaconChain alias Archethic.BeaconChain.Subset.P2PSampling + alias Archethic.BeaconChain.Summary alias Archethic.BeaconChain.SummaryAggregate alias Archethic.Crypto alias Archethic.DB + alias Archethic.Election + alias Archethic.PubSub alias Archethic.P2P @@ -103,24 +106,63 @@ defmodule Archethic.SelfRepair.Sync do patch :: binary() ) :: :ok | {:error, :unreachable_nodes} def load_missed_transactions(last_sync_date = %DateTime{}, patch) when is_binary(patch) do - Logger.info( - "Fetch missed transactions from last sync date: #{DateTime.to_string(last_sync_date)}" - ) + last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now()) + + if last_summary_time > last_sync_date do + Logger.info( + "Fetch missed transactions from last sync date: #{DateTime.to_string(last_sync_date)}" + ) + + do_load_missed_transactions(last_sync_date, last_summary_time, patch) + else + Logger.info("Already synchronized for #{DateTime.to_string(last_sync_date)}") + + # We skip the self-repair because the last synchronization time have been already synchronized + :ok + end + end + defp do_load_missed_transactions(last_sync_date, last_summary_time, patch) do start = System.monotonic_time() - last_sync_date - |> BeaconChain.next_summary_dates() - |> BeaconChain.fetch_summary_aggregates() - |> tap(&ensure_summaries_download/1) + summaries_aggregates = fetch_summaries_aggregates(last_sync_date, last_summary_time) + last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time) + ensure_download_last_aggregate(last_aggregate) + + last_aggregate = aggregate_with_local_summaries(last_aggregate, last_summary_time) + + summaries_aggregates + |> Stream.concat([last_aggregate]) |> Enum.each(&process_summary_aggregate(&1, patch)) :telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start}) Archethic.Bootstrap.NetworkConstraints.persist_genesis_address() end - defp ensure_summaries_download(aggregates) do - # Make sure the beacon summaries have been synchronized + defp fetch_summaries_aggregates(last_sync_date, last_summary_time) do + last_sync_date + |> BeaconChain.next_summary_dates() + # Take only the previous summaries before the last one + |> Stream.take_while(fn date -> + DateTime.compare(date, last_summary_time) == :lt + end) + # Fetch the beacon summaries aggregate + |> Task.async_stream(fn date -> + Logger.debug("Fetch summary aggregate for #{date}") + BeaconChain.fetch_summaries_aggregate(date) + end) + |> Stream.filter(fn + {:ok, {:ok, %SummaryAggregate{}}} -> + true + + _ -> + raise "Cannot make the self-repair - Previous summary aggregate not fetched" + end) + |> Stream.map(fn {:ok, {:ok, aggregate}} -> aggregate end) + end + + defp ensure_download_last_aggregate(last_aggregate = %SummaryAggregate{}) do + # Make sure the last beacon aggregate have been synchronized # from remote nodes to avoid self-repair to be acknowledged if those # cannot be reached node_public_key = Crypto.first_node_public_key() @@ -135,15 +177,28 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.reject(&(&1.first_public_key == node_public_key)) |> Enum.count() - if remaining_nodes > 0 and aggregates == [] do - Logger.error("Cannot make the self-repair - Not reachable nodes") - {:error, :unreachable_nodes} - else - :ok + if remaining_nodes > 0 and SummaryAggregate.empty?(last_aggregate) do + raise "Cannot make the self repair - Last aggregate not fetched" end end end + defp aggregate_with_local_summaries(summary_aggregate, last_summary_time) do + BeaconChain.list_subsets() + |> Task.async_stream(fn subset -> + summary_address = Crypto.derive_beacon_chain_address(subset, last_summary_time, true) + BeaconChain.get_summary(summary_address) + end) + |> Enum.reduce(summary_aggregate, fn + {:ok, {:ok, summary = %Summary{}}}, acc -> + SummaryAggregate.add_summary(acc, summary) + + _, acc -> + acc + end) + |> SummaryAggregate.aggregate() + end + @doc """ Process beacon summary to synchronize the transactions involving. @@ -155,10 +210,12 @@ defmodule Archethic.SelfRepair.Sync do the readiness or the availability of a node. Also, the number of transaction received and the fees burned during the beacon summary interval will be stored. + + At the end of the execution, the summaries aggregate will persisted locally if the node is member of the shard of the summary """ @spec process_summary_aggregate(SummaryAggregate.t(), binary()) :: :ok def process_summary_aggregate( - %SummaryAggregate{ + aggregate = %SummaryAggregate{ summary_time: summary_time, transaction_summaries: transaction_summaries, p2p_availabilities: p2p_availabilities @@ -201,6 +258,8 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.each(&update_availabilities/1) update_statistics(summary_time, transaction_summaries) + + store_aggregate(aggregate) end defp synchronize_transactions([], _node_patch), do: :ok @@ -303,4 +362,22 @@ defmodule Archethic.SelfRepair.Sync do PubSub.notify_new_tps(tps, nb_transactions) end + + defp store_aggregate(aggregate = %SummaryAggregate{summary_time: summary_time}) do + node_list = + [P2P.get_node_info() | P2P.authorized_and_available_nodes()] |> P2P.distinct_nodes() + + should_store? = + summary_time + |> Crypto.derive_beacon_aggregate_address() + |> Election.chain_storage_nodes(node_list) + |> Utils.key_in_node_list?(Crypto.first_node_public_key()) + + if should_store? do + BeaconChain.write_summaries_aggregate(aggregate) + Logger.info("Summary written to disk for #{summary_time}") + else + :ok + end + end end diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index 39d6dfaf9..ff40ec5a5 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -3,6 +3,7 @@ defmodule ArchethicWeb.BeaconChainLive do use ArchethicWeb, :live_view alias Archethic.BeaconChain + alias Archethic.BeaconChain.SummaryAggregate alias Archethic.Election @@ -111,11 +112,26 @@ defmodule ArchethicWeb.BeaconChainLive do {:noreply, new_socket} end + def handle_info({:load_at, date}, socket = %{assigns: %{current_date_page: 2}}) do + # Try to fetch from the cache, other fetch from the beacon summaries + {:ok, transactions} = + TransactionCache.resolve(date, fn -> + list_transactions_from_summaries(date) + end) + + new_assign = + socket + |> assign(:fetching, false) + |> assign(:transactions, transactions) + + {:noreply, new_assign} + end + def handle_info({:load_at, date}, socket) do - # Try to fetch from the cache, other fetch from the beacon summary + # Try to fetch from the cache, other fetch from the beacon aggregate {:ok, transactions} = TransactionCache.resolve(date, fn -> - list_transactions_from_summary(date) + list_transactions_from_aggregate(date) end) new_assign = @@ -207,14 +223,26 @@ defmodule ArchethicWeb.BeaconChainLive do |> Enum.sort({:desc, DateTime}) end - defp list_transactions_from_summary(date = %DateTime{}) do - [date] - |> BeaconChain.fetch_summary_aggregates() - |> Enum.flat_map(& &1.transaction_summaries) - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) + defp list_transactions_from_summaries(date = %DateTime{}) do + %SummaryAggregate{transaction_summaries: tx_summaries} = + BeaconChain.fetch_and_aggregate_summaries(date) + + Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime}) + end + + defp list_transactions_from_summaries(nil), do: [] + + defp list_transactions_from_aggregate(date = %DateTime{}) do + case BeaconChain.get_summaries_aggregate(date) do + {:ok, %SummaryAggregate{transaction_summaries: tx_summaries}} -> + Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime}) + + _ -> + [] + end end - defp list_transactions_from_summary(nil), do: [] + defp list_transactions_from_aggregate(nil), do: [] # Slots which are already has been added # Real time transaction can be get from pubsub diff --git a/lib/archethic_web/live/explorer/cache.ex b/lib/archethic_web/live/explorer/cache.ex index 301635832..0e19898b5 100644 --- a/lib/archethic_web/live/explorer/cache.ex +++ b/lib/archethic_web/live/explorer/cache.ex @@ -1,5 +1,5 @@ defmodule ArchethicWeb.ExplorerLive.TopTransactionsCache do - @table :last_ten_transactions + @table :last_transactions @moduledoc false use GenServer @@ -11,7 +11,7 @@ defmodule ArchethicWeb.ExplorerLive.TopTransactionsCache do def init(_args) do :ets.new(@table, [:set, :public, :named_table]) - :ets.insert(@table, {:size, 10, 0}) + :ets.insert(@table, {:size, 5, 0}) {:ok, []} end diff --git a/lib/archethic_web/live/top_transactions_component.ex b/lib/archethic_web/live/top_transactions_component.ex index 37b2c15bf..3508cee90 100644 --- a/lib/archethic_web/live/top_transactions_component.ex +++ b/lib/archethic_web/live/top_transactions_component.ex @@ -1,6 +1,6 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do @moduledoc """ - Live component for Dashboard Explorer to display last 10 transactions + Live component for Dashboard Explorer to display recent transactions """ use ArchethicWeb, :live_component @@ -41,26 +41,14 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do def update(assigns, socket) do transactions = - case length(TopTransactionsCache.get()) do - l when l < 10 -> - # Below code won't const much performace as atmost 10 transaction will be pushed. + case TopTransactionsCache.get() do + [] -> txns = fetch_last_transactions() - - txns |> push_txns_to_cache() - + push_txns_to_cache(txns) txns - _ -> - [head | _] = TopTransactionsCache.get() - - # should refersh txns only if atleast 10 seconds have passed - if DateTime.diff(head.timestamp, DateTime.utc_now()) < 10 do - txns = fetch_last_transactions() - txns |> push_txns_to_cache() - txns - else - TopTransactionsCache.get() - end + txs -> + txs end socket = socket |> assign(assigns) |> assign(transactions: transactions) @@ -69,26 +57,31 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do def render(assigns) do ~L""" -
-

Latest Txns

-
-
- <%= for tx <- @transactions do %> -
-
- <%= link to: Routes.live_path(@socket, ArchethicWeb.TransactionDetailsLive, Base.encode16(tx.address)) do%> - <%= Base.encode16(tx.address) %> - <% end %> -
-
- <%= format_date(tx.timestamp) %> -
-
- <%= tx.type %> -
+
+
+
Latest transactions
+
+
+
+ <%= for tx <- @transactions do %> +
+
+ <%= link to: Routes.live_path(@socket, ArchethicWeb.TransactionDetailsLive, Base.encode16(tx.address)) do%> + <%= Base.encode16(tx.address) %> + <% end %> +
+
+ <%= format_date(tx.timestamp) %> +
+
+ <%= tx.type %> +
+
+ <% end %>
- <% end %> +
+
""" end @@ -99,38 +92,9 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do end) end - defp fetch_previous_dates(time = %DateTime{}, txns, retries) - when length(txns) < 10 and retries < 10 do - previous_time = BeaconChain.previous_summary_time(time) - prev_txns = list_transactions_from_summary(previous_time) - - retries = - case prev_txns do - [] -> retries + 1 - _txns -> 0 - end - - txns = txns ++ prev_txns - fetch_previous_dates(previous_time, txns, retries) - end - - defp fetch_previous_dates(_time = %DateTime{}, txns, _retries) when length(txns) >= 10 do - txns - end - - defp fetch_previous_dates(_time = %DateTime{}, txns, _retries) do - txns - end - - defp fetch_last_transactions(n \\ 10) do - txns = list_transactions_from_current_slots() - - if length(txns) < n do - fetch_previous_dates(DateTime.utc_now(), txns, 0) - else - txns - |> Enum.take(n) - end + defp fetch_last_transactions(n \\ 5) do + list_transactions_from_current_slots() + |> Enum.take(n) end defp list_transactions_from_current_slots(date = %DateTime{} \\ DateTime.utc_now()) do @@ -182,11 +146,4 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do |> Stream.flat_map(&elem(&1, 1)) |> Enum.to_list() end - - defp list_transactions_from_summary(date = %DateTime{}) do - [date] - |> BeaconChain.fetch_summary_aggregates() - |> Enum.flat_map(& &1.transaction_summaries) - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) - end end diff --git a/lib/archethic_web/live/top_transactions_live.ex b/lib/archethic_web/live/top_transactions_live.ex deleted file mode 100644 index 50161a7c9..000000000 --- a/lib/archethic_web/live/top_transactions_live.ex +++ /dev/null @@ -1,51 +0,0 @@ -defmodule ArchethicWeb.TopTransactionLive do - @moduledoc false - use ArchethicWeb, :live_view - - alias Phoenix.View - - alias Archethic.PubSub - - alias Archethic.TransactionChain - - alias ArchethicWeb.ExplorerView - - @nb_transactions 3 - - def mount(_params, _session, socket) do - if connected?(socket) do - PubSub.register_to_new_transaction() - end - - transactions = - [:address, :type, validation_stamp: [:timestamp]] - |> TransactionChain.list_all() - |> get_last_n(@nb_transactions) - - {:ok, assign(socket, transactions: transactions)} - end - - def render(assigns) do - View.render(ExplorerView, "top_transactions.html", assigns) - end - - def handle_info({:new_transaction, address, type, timestamp}, socket) do - new_socket = - update( - socket, - :transactions, - &(Stream.concat(&1, [%{address: address, type: type, timestamp: timestamp}]) - |> get_last_n(@nb_transactions)) - ) - - {:noreply, new_socket} - end - - def handle_info(_, socket), do: {:noreply, socket} - - defp get_last_n(transactions, n) do - transactions - # |> Enum.sort_by(& &1.validation_stamp.timestamp, {:desc, DateTime}) - |> Enum.take(n) - end -end diff --git a/lib/mix/tasks/migrate.ex b/lib/mix/tasks/migrate.ex new file mode 100644 index 000000000..faa1432f5 --- /dev/null +++ b/lib/mix/tasks/migrate.ex @@ -0,0 +1,47 @@ +defmodule Mix.Tasks.Archethic.Migrate do + @moduledoc "Handle data migration" + + use Mix.Task + + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate + + alias Archethic.DB.EmbeddedImpl + alias Archethic.DB.EmbeddedImpl.ChainWriter + + def run(_arg) do + :archethic + |> Application.spec(:vsn) + |> List.to_string() + |> migrate() + end + + def migrate("0.25.0") do + db_path = EmbeddedImpl.db_path() + File.mkdir_p!(ChainWriter.base_beacon_aggregate_path(db_path)) + + ChainWriter.base_beacon_path(db_path) + |> Path.join("*") + |> Path.wildcard() + |> Enum.map(fn file -> + {summary, _} = + file + |> File.read!() + |> Summary.deserialize() + + summary + end) + |> Enum.reduce(%{}, fn summary = %Summary{summary_time: summary_time}, acc -> + Map.update(acc, summary_time, %SummaryAggregate{summary_time: summary_time}, fn aggregate -> + aggregate + |> SummaryAggregate.add_summary(summary, false) + end) + end) + |> Enum.map(fn {_, summary_aggregate} -> SummaryAggregate.aggregate(summary_aggregate) end) + |> Enum.each(fn aggregate -> + EmbeddedImpl.write_beacon_summaries_aggregate(aggregate) + end) + end + + def migrate(_), do: :ok +end diff --git a/rel/config.exs b/rel/config.exs index 2459d5685..5cf3b9f1d 100644 --- a/rel/config.exs +++ b/rel/config.exs @@ -26,6 +26,7 @@ environment Mix.env() do set include_src: false set vm_args: "rel/vm.args" set pre_configure_hooks: "rel/pre_configure" + set post_start_hooks: "rel/post_start" set config_providers: [ {Distillery.Releases.Config.Providers.Elixir, ["${REL_DIR}/runtime_config.exs"]} diff --git a/rel/post_start/migrate.sh b/rel/post_start/migrate.sh new file mode 100755 index 000000000..a22d68c92 --- /dev/null +++ b/rel/post_start/migrate.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +release_remote_ctl eval --mfa "Mix.Tasks.Archethic.Migrate.run/1" --argv "$@" diff --git a/test/archethic/beacon_chain/slot_timer_test.exs b/test/archethic/beacon_chain/slot_timer_test.exs index 1d0c8bb00..7370dba82 100644 --- a/test/archethic/beacon_chain/slot_timer_test.exs +++ b/test/archethic/beacon_chain/slot_timer_test.exs @@ -3,6 +3,7 @@ defmodule Archethic.BeaconChain.SlotTimerTest do alias Archethic.BeaconChain alias Archethic.BeaconChain.SlotTimer + alias Archethic.BeaconChain.SummaryTimer alias Archethic.BeaconChain.SubsetRegistry alias Archethic.P2P @@ -15,6 +16,8 @@ defmodule Archethic.BeaconChain.SlotTimerTest do Registry.register(SubsetRegistry, subset, []) end) + start_supervised!({SummaryTimer, interval: "0 * * * * *"}) + :ok end diff --git a/test/archethic/beacon_chain/summary_aggregate_test.exs b/test/archethic/beacon_chain/summary_aggregate_test.exs new file mode 100644 index 000000000..4764391b7 --- /dev/null +++ b/test/archethic/beacon_chain/summary_aggregate_test.exs @@ -0,0 +1,8 @@ +defmodule Archethic.BeaconChain.SummaryAggregateTest do + use ExUnit.Case + + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.TransactionChain.TransactionSummary + + doctest SummaryAggregate +end diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index 15d6be4b6..3798806a5 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -96,7 +96,7 @@ defmodule Archethic.BeaconChainTest do end end - describe "fetch_summary_aggregates/1" do + describe "fetch_and_aggregate_summaries/1" do setup do summary_time = ~U[2021-01-22 16:12:58Z] @@ -215,8 +215,8 @@ defmodule Archethic.BeaconChainTest do {:ok, %BeaconSummaryList{summaries: [beacon_summary]}} end) - [%SummaryAggregate{transaction_summaries: transaction_summaries}] = - BeaconChain.fetch_summary_aggregates([summary_time]) + %SummaryAggregate{transaction_summaries: transaction_summaries} = + BeaconChain.fetch_and_aggregate_summaries(summary_time) assert [addr1] == Enum.map(transaction_summaries, & &1.address) end @@ -309,15 +309,15 @@ defmodule Archethic.BeaconChainTest do {:ok, %BeaconSummaryList{summaries: [summary_v2]}} end) - [%SummaryAggregate{transaction_summaries: transaction_summaries}] = - BeaconChain.fetch_summary_aggregates([summary_time]) + %SummaryAggregate{transaction_summaries: transaction_summaries} = + BeaconChain.fetch_and_aggregate_summaries(summary_time) transaction_addresses = Enum.map(transaction_summaries, & &1.address) assert Enum.all?(transaction_addresses, &(&1 in [addr1, addr2])) end - test "should find other beacon summaries and aggregate node P2P views", %{ + test "should find other beacon summaries and accumulate node P2P views", %{ summary_time: summary_time, nodes: [node1, node2, node3, node4] } do @@ -349,29 +349,68 @@ defmodule Archethic.BeaconChainTest do end_of_node_synchronizations: [] } + subset_address = Crypto.derive_beacon_chain_address("A", summary_time, true) + MockClient |> stub(:send_message, fn - ^node1, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v1]}} - - ^node2, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + ^node1, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v1] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node2, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v2] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node3, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v3] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node4, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v4] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + end) - ^node3, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v3]}} + assert %SummaryAggregate{ + p2p_availabilities: %{"A" => %{node_availabilities: node_availabilities}} + } = BeaconChain.fetch_and_aggregate_summaries(summary_time) - ^node4, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v4]}} - end) + expected_availabilities = + [summary_v1, summary_v2, summary_v3, summary_v4] + |> Enum.map(& &1.node_availabilities) + |> Enum.flat_map(&Utils.bitstring_to_integer_list/1) + |> Enum.sort() - assert [ - %SummaryAggregate{ - p2p_availabilities: %{"A" => %{node_availabilities: <<1::1, 1::1, 1::1>>}} - } - ] = BeaconChain.fetch_summary_aggregates([summary_time]) + assert node_availabilities + |> Enum.flat_map(& &1) + |> Enum.sort() == + expected_availabilities end - test "should find other beacon summaries and aggregate node P2P avg availabilities", %{ + test "should find other beacon summaries and accumulate node P2P avg availabilities", %{ summary_time: summary_time, nodes: [node1, node2, node3, node4] } do @@ -397,31 +436,70 @@ defmodule Archethic.BeaconChainTest do subset: "A", summary_time: summary_time, node_availabilities: <<1::1, 1::1, 0::1>>, - node_average_availabilities: [1, 0.5, 1, 0.4] + node_average_availabilities: [1.0, 0.5, 1.0, 0.4] } + subset_address = Crypto.derive_beacon_chain_address("A", summary_time, true) + MockClient |> stub(:send_message, fn - ^node1, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v1]}} - - ^node2, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v2]}} - - ^node3, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v3]}} - - ^node4, %GetBeaconSummaries{}, _ -> - {:ok, %BeaconSummaryList{summaries: [summary_v4]}} + ^node1, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v1] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node2, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v2] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node3, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v3] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node4, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v4] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} end) - assert [ - %SummaryAggregate{ - p2p_availabilities: %{ - "A" => %{node_average_availabilities: [0.925, 0.8, 0.925, 0.85]} - } + assert %SummaryAggregate{ + p2p_availabilities: %{ + "A" => %{node_average_availabilities: node_average_availabilities} } - ] = BeaconChain.fetch_summary_aggregates([summary_time]) + } = BeaconChain.fetch_and_aggregate_summaries(summary_time) + + expected_average_availabilities = + [summary_v1, summary_v2, summary_v3, summary_v4] + |> Enum.map(& &1.node_average_availabilities) + |> Enum.flat_map(& &1) + |> Enum.sort() + + assert node_average_availabilities + |> Enum.flat_map(& &1) + |> Enum.sort() == + expected_average_availabilities end end end diff --git a/test/archethic/db/embedded_impl_test.exs b/test/archethic/db/embedded_impl_test.exs index 57d0e6296..3b4ffc5d0 100644 --- a/test/archethic/db/embedded_impl_test.exs +++ b/test/archethic/db/embedded_impl_test.exs @@ -1,20 +1,22 @@ defmodule Archethic.DB.EmbeddedTest do use ArchethicCase, async: false + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate + + alias Archethic.Crypto + alias Archethic.DB.EmbeddedImpl alias Archethic.DB.EmbeddedImpl.Encoding alias Archethic.DB.EmbeddedImpl.ChainIndex alias Archethic.DB.EmbeddedImpl.ChainWriter alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.TransactionSummary alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations alias Archethic.TransactionFactory - alias Archethic.BeaconChain.Summary - - alias Archethic.Crypto - alias Archethic.Utils setup do @@ -176,7 +178,7 @@ defmodule Archethic.DB.EmbeddedTest do end describe "get_beacon_summary/1" do - test "should return an error when the summary does not exists" do + test "should return an error when the summary does not exist" do assert {:error, :summary_not_exists} = EmbeddedImpl.get_beacon_summary(:crypto.strong_rand_bytes(32)) end @@ -203,6 +205,38 @@ defmodule Archethic.DB.EmbeddedTest do end end + describe "get_beacon_summaries_aggregate/1" do + test "should return an error when the aggregate does not exist" do + {:error, :not_exists} = EmbeddedImpl.get_beacon_summaries_aggregate(DateTime.utc_now()) + end + + test "should retrieve a beacon summaries aggregate" do + aggregate = %SummaryAggregate{ + summary_time: ~U[2020-09-01 00:00:00Z], + transaction_summaries: [ + %TransactionSummary{ + type: :transfer, + address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + fee: 100_000_000, + timestamp: ~U[2020-08-31 20:00:00.232Z] + } + ], + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<1::1>>, + node_average_availabilities: [1.0], + end_of_node_synchronizations: [] + } + } + } + + :ok = EmbeddedImpl.write_beacon_summaries_aggregate(aggregate) + + assert {:ok, ^aggregate} = + EmbeddedImpl.get_beacon_summaries_aggregate(~U[2020-09-01 00:00:00Z]) + end + end + describe "get_transaction_chain/2" do test "should return an empty list when the transaction chain is not found" do assert {[], false, ""} = EmbeddedImpl.get_transaction_chain(:crypto.strong_rand_bytes(32)) diff --git a/test/archethic/self_repair/scheduler_test.exs b/test/archethic/self_repair/scheduler_test.exs index cf3bb85e0..8cc050431 100644 --- a/test/archethic/self_repair/scheduler_test.exs +++ b/test/archethic/self_repair/scheduler_test.exs @@ -39,14 +39,14 @@ defmodule Archethic.SelfRepair.SchedulerTest do {:ok, %NotFound{}} end) - {:ok, pid} = Scheduler.start_link([interval: "*/1 * * * * * *"], []) + {:ok, pid} = Scheduler.start_link([interval: "*/3 * * * * * *"], []) assert :ok = Scheduler.start_scheduler(pid) + %{timer: timer} = :sys.get_state(pid) :erlang.trace(pid, true, [:receive]) - assert_receive {:trace, ^pid, :receive, :sync}, 2_000 - - Process.sleep(100) + assert_receive {:trace, ^pid, :receive, :sync}, 4_000 + Process.cancel_timer(timer) end test "handle_info/3 should initiate the loading of missing transactions, schedule the next repair and update the last sync date" do @@ -77,7 +77,7 @@ defmodule Archethic.SelfRepair.SchedulerTest do :ok end) - {:ok, pid} = Scheduler.start_link([interval: "*/1 * * * * * *"], []) + {:ok, pid} = Scheduler.start_link([interval: "0 0 * * * * *"], []) send(pid, :sync) diff --git a/test/support/template.ex b/test/support/template.ex index 5f1753492..1641e56d2 100644 --- a/test/support/template.ex +++ b/test/support/template.ex @@ -53,6 +53,8 @@ defmodule ArchethicCase do |> stub(:transaction_exists?, fn _ -> false end) |> stub(:register_p2p_summary, fn _, _, _, _ -> :ok end) |> stub(:get_last_p2p_summaries, fn -> [] end) + |> stub(:get_latest_tps, fn -> 0.0 end) + |> stub(:register_stats, fn _, _, _, _ -> :ok end) |> stub(:get_bootstrap_info, fn "storage_nonce" -> "nonce" @@ -67,6 +69,10 @@ defmodule ArchethicCase do "127.0.0.1:3002:0100044D91A0A1A7CF06A2902D3842F82D2791BCBF3EE6F6DC8DE0F90E53E9991C3CB33684B7B9E66F26E7C9F5302F73C69897BE5F301DE9A63521A08AC4EF34C18728:tcp" end) |> stub(:set_bootstrap_info, fn _, _ -> :ok end) + |> stub(:write_beacon_summaries_aggregate, fn _ -> :ok end) + |> stub(:get_beacon_summaries_aggregate, fn _ -> {:error, :not_exists} end) + |> stub(:clear_beacon_summaries, fn -> :ok end) + |> stub(:get_beacon_summary, fn _ -> {:error, :not_exists} end) {:ok, shared_secrets_counter} = Agent.start_link(fn -> 0 end) {:ok, network_pool_counter} = Agent.start_link(fn -> 0 end)