diff --git a/config/test.exs b/config/test.exs index 3197c72ca..629ccb061 100755 --- a/config/test.exs +++ b/config/test.exs @@ -22,6 +22,7 @@ config :archethic, Archethic.BeaconChain.SummaryTimer, interval: "0 * * * * *" config :archethic, Archethic.BeaconChain.Subset.SummaryCache, enabled: false +config :archethic, Archethic.BeaconChain.Subset.StatsCollector, enabled: false config :archethic, Archethic.Bootstrap, enabled: false diff --git a/lib/archethic/application.ex b/lib/archethic/application.ex index 7785d9af7..df41165fe 100644 --- a/lib/archethic/application.ex +++ b/lib/archethic/application.ex @@ -104,7 +104,8 @@ defmodule Archethic.Application do MetricSupervisor, # a registry used in Utils to ensure a function is executed at most once concurrently - {Registry, keys: :unique, name: Archethic.RunExclusiveRegistry} + {Registry, keys: :unique, name: Archethic.RunExclusiveRegistry}, + {Registry, keys: :unique, name: Archethic.Utils.JobCacheRegistry} ] opts = [strategy: :rest_for_one, name: Archethic.Supervisor] diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index 0912ca0a5..c253ee8fa 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -219,9 +219,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do defp get_digit(acc, _, _, _, _, _), do: acc @doc """ - Fetch remotely the network stats for a given summary time + Fetch remotely the network stats for a given summary time - This request all the subsets to gather the aggregated network stats. + This requests all the beacon nodes their aggregated network stats. A NxN latency matrix is then computed based on the network stats origins and targets """ @spec fetch_network_stats(DateTime.t()) :: Nx.Tensor.t() @@ -230,37 +230,34 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do sorted_node_list = P2P.list_nodes() |> Enum.sort_by(& &1.first_public_key) nb_nodes = length(sorted_node_list) + beacon_nodes = get_beacon_nodes(summary_time, authorized_nodes) matrix = Nx.broadcast(0, {nb_nodes, nb_nodes}) - summary_time - |> get_subsets_nodes(authorized_nodes) - # Aggregate subsets by node - |> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc -> - Enum.reduce(beacon_nodes, acc, fn node, acc -> - Map.update(acc, node, [subset], &[subset | &1]) - end) - end) - |> stream_subsets_stats() + stream_network_stats(summary_time, beacon_nodes) # Aggregate stats per node to identify the sampling nodes |> aggregate_stats_per_subset() |> update_matrix_from_stats(matrix, sorted_node_list) end - defp get_subsets_nodes(summary_time, authorized_nodes) do - Enum.map(BeaconChain.list_subsets(), fn subset -> - beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) - {subset, beacon_nodes} + defp get_beacon_nodes(summary_time, authorized_nodes) do + BeaconChain.list_subsets() + |> Enum.reduce(MapSet.new(), fn subset, acc -> + Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) + |> MapSet.new() + |> MapSet.union(acc) end) + |> MapSet.to_list() end - defp stream_subsets_stats(subsets_by_node) do + defp stream_network_stats(summary_time, beacon_nodes) do Task.Supervisor.async_stream_nolink( TaskSupervisor, - subsets_by_node, - fn {node, subsets} -> - P2P.send_message(node, %GetNetworkStats{subsets: subsets}) + beacon_nodes, + fn node -> + P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, 5_000) end, + timeout: 6_000, ordered: false, on_timeout: :kill_task, max_concurrency: 256 diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 05ed1ab80..a1110b787 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -389,7 +389,7 @@ defmodule Archethic.BeaconChain.Subset do Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset)) patch_task = - Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end) + Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(time, subset) end) summary = %Summary{subset: subset, summary_time: time} @@ -420,12 +420,10 @@ defmodule Archethic.BeaconChain.Subset do end :ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches}) - - SummaryCache.clean_previous_summary_slots(subset, time) end end - defp get_network_patches(subset, summary_time) do + defp get_network_patches(summary_time, subset) do with true <- length(P2P.authorized_and_available_nodes()) > 1, sampling_nodes when sampling_nodes != [] <- P2PSampling.list_nodes_to_sample(subset) do sampling_nodes_indexes = @@ -437,8 +435,7 @@ defmodule Archethic.BeaconChain.Subset do end) |> Enum.map(fn {_, index} -> index end) - summary_time - |> StatsCollector.get() + StatsCollector.fetch(summary_time) |> NetworkCoordinates.get_patch_from_latencies() |> Enum.with_index() |> Enum.filter(fn {_, index} -> diff --git a/lib/archethic/beacon_chain/subset/stats_collector.ex b/lib/archethic/beacon_chain/subset/stats_collector.ex index 98276b49e..e235aec6f 100644 --- a/lib/archethic/beacon_chain/subset/stats_collector.ex +++ b/lib/archethic/beacon_chain/subset/stats_collector.ex @@ -1,65 +1,173 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do @moduledoc """ - Process responsible to collect subset network stats - and reply to parallels requests to reduce the network load. + Get the networks stats locally and remotely + + Uses 2 job caches: + - cache_get: cache the aggregation of local stats + - cache_fetch: cache the I/O to fetch remote stats + + It subscribes to 2 events to start and stop both jobs ASAP + Jobs are also started by the get/fetch function if needed """ @vsn Mix.Project.config()[:version] + @timeout :archethic + |> Application.compile_env(__MODULE__, []) + |> Keyword.get(:timeout, :timer.minutes(1)) + use GenServer + alias Archethic.P2P + alias Archethic.Election + alias Archethic.BeaconChain alias Archethic.BeaconChain.NetworkCoordinates - alias Archethic.TaskSupervisor + alias Archethic.Utils.JobCache + alias Archethic.PubSub require Logger - def start_link(_arg \\ []) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) + # ------------------------------------------------------------ + # _ ____ ___ + # / \ | _ |_ _| + # / _ \ | |_) | | + # / ___ \| __/| | + # /_/ \_|_| |___| + # ------------------------------------------------------------ + + @spec start_link(Keyword.t()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) end - @spec get(DateTime.t()) :: Nx.Tensor.t() - def get(summary_time) do - try do - GenServer.call(__MODULE__, {:get, summary_time}) - catch - :exit, {:timeout, _} -> - Logger.warning("Fetching network stats take longer than 5s") - Nx.tensor(0) - end + @doc """ + Get the local stats if current node is beacon storage node + """ + @spec get(DateTime.t(), pos_integer()) :: %{binary() => Nx.Tensor.t()} + def get(summary_time, timeout \\ @timeout) do + JobCache.get!( + {:get, summary_time}, + function: fn -> + get_current_node_subsets(summary_time) + |> do_get_stats() + end, + timeout: timeout + ) + catch + :exit, _ -> %{} end + @doc """ + Fetch the stats of given summary from beacon_nodes + """ + @spec fetch(DateTime.t(), pos_integer()) :: Nx.Tensor.t() + def fetch(summary_time, timeout \\ @timeout) do + JobCache.get!( + {:fetch, summary_time}, + function: fn -> do_fetch_stats(summary_time) end, + timeout: timeout + ) + catch + :exit, _ -> Nx.tensor(0) + end + + # ------------------------------------------------------------ + # _ _ _ _ + # ___ __ _| | | |__ __ _ ___| | _____ + # / __/ _` | | | '_ \ / _` |/ __| |/ / __| + # | (_| (_| | | | |_) | (_| | (__| <\__ \ + # \___\__,_|_|_|_.__/ \__,_|\___|_|\_|___/ + # ------------------------------------------------------------ def init(_) do - {:ok, %{fetching_task: nil, clients: []}} + PubSub.register_to_next_summary_time() + PubSub.register_to_self_repair() + {:ok, :no_state} end - def handle_call({:get, summary_time}, from, state = %{fetching_task: nil}) do - task = - Task.Supervisor.async_nolink(TaskSupervisor, fn -> - start_time = System.monotonic_time() - stats = NetworkCoordinates.fetch_network_stats(summary_time) - {stats, start_time} - end) + def handle_info({:next_summary_time, next_summary_time}, state) do + summary_time = BeaconChain.previous_summary_time(next_summary_time) - new_state = - state - |> Map.update!(:clients, &[from | &1]) - |> Map.put(:fetching_task, task) + maybe_start_job({:get, summary_time}) + maybe_start_job({:fetch, summary_time}) - {:noreply, new_state} + {:noreply, state} end - def handle_call({:get, _summary_time}, from, state = %{fetching_task: _}) do - new_state = - state - |> Map.update!(:clients, &[from | &1]) + def handle_info(:self_repair_sync, state) do + summary_time = BeaconChain.previous_summary_time(DateTime.utc_now()) + + maybe_stop_job({:get, summary_time}) + maybe_stop_job({:fetch, summary_time}) - {:noreply, new_state} + {:noreply, state} end - def handle_info( - {ref, {stats, start_time}}, - state = %{clients: clients, fetching_task: %Task{ref: ref_task}} - ) - when ref_task == ref do + # ------------------------------------------------------------ + # _ _ + # _ __ _ __(___ ____ _| |_ ___ + # | '_ \| '__| \ \ / / _` | __/ _ \ + # | |_) | | | |\ V | (_| | || __/ + # | .__/|_| |_| \_/ \__,_|\__\___| + # |_| + # ------------------------------------------------------------ + + defp maybe_start_job({action, summary_time}) do + case get_current_node_subsets(summary_time) do + [] -> + nil + + subsets -> + JobCache.start( + immediate: true, + name_key: {action, summary_time}, + function: fn -> + case action do + :get -> + do_get_stats(subsets) + + :fetch -> + do_fetch_stats(summary_time) + end + end + ) + end + end + + defp maybe_stop_job(key) do + JobCache.stop(key) + end + + defp do_get_stats(subsets) do + subsets + |> Task.async_stream( + fn subset -> + stats = BeaconChain.get_network_stats(subset) + + {subset, stats} + end, + timeout: 10_000, + on_timeout: :kill_task, + ordered: false, + max_concurrency: 256 + ) + |> Stream.filter(fn + {:exit, :timeout} -> false + _ -> true + end) + |> Stream.map(fn {:ok, res} -> res end) + |> Enum.to_list() + |> Enum.reduce(%{}, fn + {subset, stats}, acc when map_size(stats) > 0 -> + Map.put(acc, subset, stats) + + _, acc -> + acc + end) + end + + defp do_fetch_stats(summary_time) do + start_time = System.monotonic_time() + stats = NetworkCoordinates.fetch_network_stats(summary_time) + :telemetry.execute( [:archethic, :beacon_chain, :network_coordinates, :collect_stats], %{ @@ -68,15 +176,21 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do %{matrix_size: Nx.size(stats)} ) - Enum.each(clients, &GenServer.reply(&1, stats)) + stats + end - new_state = - state - |> Map.put(:clients, []) - |> Map.put(:fetching_task, nil) + defp get_current_node_subsets(summary_time) do + authorized_nodes = P2P.authorized_and_available_nodes(summary_time, true) + current_node = P2P.get_node_info() - {:noreply, new_state} - end + Enum.reduce(BeaconChain.list_subsets(), [], fn subset, acc -> + beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state} + if current_node in beacon_nodes do + [subset | acc] + else + acc + end + end) + end end diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index 1fcf689c1..1ae1d5dc5 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -34,12 +34,22 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do PubSub.register_to_current_epoch_of_slot_time() PubSub.register_to_node_status() + PubSub.register_to_self_repair() {:ok, %{}} end def code_change(_version, state, _extra), do: {:ok, state} + def handle_info(:self_repair_sync, state) do + previous_summary_time = SummaryTimer.previous_summary(DateTime.utc_now()) + + BeaconChain.list_subsets() + |> Enum.each(&clean_previous_summary_slots(&1, previous_summary_time)) + + {:noreply, state} + end + def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do if SummaryTimer.match_interval?(slot_time), do: delete_old_backup_file(slot_time) @@ -58,25 +68,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do def handle_info(:node_down, state), do: {:noreply, state} - @doc """ - Remove slots of previous summary time from ets table - """ - @spec clean_previous_summary_slots( - subset :: binary(), - previous_summary_time :: DateTime.t() - ) :: :ok - def clean_previous_summary_slots(subset, previous_summary_time) do - subset - |> stream_current_slots() - |> Stream.filter(fn {%Slot{slot_time: slot_time}, _} -> - DateTime.compare(slot_time, previous_summary_time) != :gt - end) - |> Stream.each(fn item -> - :ets.delete_object(@table_name, {subset, item}) - end) - |> Stream.run() - end - @doc """ Stream all the entries for a subset """ @@ -176,4 +167,16 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do {node_public_key, rest} = Utils.deserialize_public_key(rest) deserialize(rest, [{slot, node_public_key} | acc]) end + + defp clean_previous_summary_slots(subset, previous_summary_time) do + subset + |> stream_current_slots() + |> Stream.filter(fn {%Slot{slot_time: slot_time}, _} -> + DateTime.compare(slot_time, previous_summary_time) != :gt + end) + |> Stream.each(fn item -> + :ets.delete_object(@table_name, {subset, item}) + end) + |> Stream.run() + end end diff --git a/lib/archethic/beacon_chain/subset/supervisor.ex b/lib/archethic/beacon_chain/subset/supervisor.ex index a4f34bb0b..be7130998 100644 --- a/lib/archethic/beacon_chain/subset/supervisor.ex +++ b/lib/archethic/beacon_chain/subset/supervisor.ex @@ -18,12 +18,13 @@ defmodule Archethic.BeaconChain.SubsetSupervisor do subset_children = subset_child_specs(BeaconChain.list_subsets()) children = - [ + Utils.configurable_children([ {Registry, - keys: :unique, name: BeaconChain.SubsetRegistry, partitions: System.schedulers_online()} - | Utils.configurable_children([SummaryCache | subset_children]) - ] - |> Enum.concat([StatsCollector]) + keys: :unique, name: BeaconChain.SubsetRegistry, partitions: System.schedulers_online()}, + SummaryCache, + StatsCollector + | subset_children + ]) Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/archethic/p2p/message/get_network_stats.ex b/lib/archethic/p2p/message/get_network_stats.ex index 62c9da113..fb8eb7b1d 100644 --- a/lib/archethic/p2p/message/get_network_stats.ex +++ b/lib/archethic/p2p/message/get_network_stats.ex @@ -3,53 +3,34 @@ defmodule Archethic.P2P.Message.GetNetworkStats do Represents a message to get the network stats from the beacon summary cache """ - @enforce_keys :subsets - defstruct subsets: [] + @enforce_keys [:summary_time] + defstruct [:summary_time] - alias Archethic.BeaconChain + alias Archethic.BeaconChain.Subset.StatsCollector alias Archethic.Crypto alias Archethic.P2P.Message.NetworkStats @type t :: %__MODULE__{ - subsets: list(binary()) + summary_time: DateTime.t() } @doc """ Serialize the get network stats message into binary - - ## Examples - - iex> %GetNetworkStats{subsets: [<<0>>, <<255>>]} |> GetNetworkStats.serialize() - << - # Length of subsets - 0, 2, - # Subset - 0, 255 - >> """ - def serialize(%__MODULE__{subsets: subsets}) do - <> + @spec serialize(t()) :: bitstring() + def serialize(%__MODULE__{summary_time: summary_time}) do + <> end @doc """ Deserialize the binary into the get network stats message - - ## Examples - - iex> <<0, 2, 0, 255>> |> GetNetworkStats.deserialize() - { - %GetNetworkStats{subsets: [<<0>>, <<255>>]}, - "" - } """ - def deserialize(<>) do - subsets = - subsets_binary - |> :erlang.binary_to_list() - |> Enum.map(&<<&1>>) + @spec deserialize(bitstring) :: {t(), bitstring()} + def deserialize(<>) do + summary_time = DateTime.from_unix!(unix) { - %__MODULE__{subsets: subsets}, + %__MODULE__{summary_time: summary_time}, rest } end @@ -58,22 +39,7 @@ defmodule Archethic.P2P.Message.GetNetworkStats do Process the message to get the network stats from the summary cache """ @spec process(t(), Crypto.key()) :: NetworkStats.t() - def process(%__MODULE__{subsets: subsets}, _node_public_key) do - stats = - subsets - |> Task.async_stream(fn subset -> - stats = BeaconChain.get_network_stats(subset) - {subset, stats} - end) - |> Stream.map(fn {:ok, res} -> res end) - |> Enum.reduce(%{}, fn - {subset, stats}, acc when map_size(stats) > 0 -> - Map.put(acc, subset, stats) - - _, acc -> - acc - end) - - %NetworkStats{stats: stats} + def process(%__MODULE__{summary_time: summary_time}, _node_public_key) do + %NetworkStats{stats: StatsCollector.get(summary_time)} end end diff --git a/lib/archethic/pub_sub.ex b/lib/archethic/pub_sub.ex index ccbb2f5c5..af7eee881 100644 --- a/lib/archethic/pub_sub.ex +++ b/lib/archethic/pub_sub.ex @@ -112,6 +112,18 @@ defmodule Archethic.PubSub do """ def register_to_node_status(), do: Registry.register(PubSubRegistry, :node_status, []) + @doc """ + Notify that a self repair synchronization is starting + """ + @spec notify_self_repair() :: :ok + def notify_self_repair(), do: dispatch(:self_repair_sync, :self_repair_sync) + + @doc """ + Register a process to self repair synchronizations starts + """ + @spec register_to_self_repair :: {:error, {:already_registered, pid}} | {:ok, pid} + def register_to_self_repair(), do: Registry.register(PubSubRegistry, :self_repair_sync, []) + @doc """ Register a process to a new transaction publication by type """ diff --git a/lib/archethic/self_repair/scheduler.ex b/lib/archethic/self_repair/scheduler.ex index 4b63d5ee8..088ca9ab4 100644 --- a/lib/archethic/self_repair/scheduler.ex +++ b/lib/archethic/self_repair/scheduler.ex @@ -92,6 +92,8 @@ defmodule Archethic.SelfRepair.Scheduler do "Self-Repair synchronization started from #{last_sync_date_to_string(last_sync_date)}" ) + PubSub.notify_self_repair() + Task.Supervisor.async_nolink(TaskSupervisor, fn -> # Loading transactions can take a lot of time to be achieve and can overpass an epoch. # So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it diff --git a/lib/archethic/utils/job_cache.ex b/lib/archethic/utils/job_cache.ex index 3180ae073..4a194c128 100644 --- a/lib/archethic/utils/job_cache.ex +++ b/lib/archethic/utils/job_cache.ex @@ -33,6 +33,8 @@ defmodule Archethic.Utils.JobCache do :called """ + alias Archethic.Utils.JobCacheRegistry + use GenServer @vsn Mix.Project.config()[:version] @@ -72,8 +74,28 @@ defmodule Archethic.Utils.JobCache do iex> :persistent_term.get(:nil) :called """ - @spec get!(GenServer.server(), timeout) :: any - def get!(pid \\ __MODULE__, timeout \\ :infinity), do: GenServer.call(pid, :get, timeout) + @spec get!(GenServer.server(), Keyword.t()) :: any + def get!(pid, opts \\ []) + + def get!(pid, opts) when is_pid(pid) do + GenServer.call(pid, :get, Keyword.get(opts, :timeout, :infinity)) + end + + def get!(name, opts) when is_atom(name) do + if Keyword.has_key?(opts, :function) do + _ = start(Keyword.put(opts, :name, name)) + end + + GenServer.call(name, :get, Keyword.get(opts, :timeout, :infinity)) + end + + def get!(key, opts) do + if Keyword.has_key?(opts, :function) do + _ = start(Keyword.put(opts, :name, via_tuple(key))) + end + + GenServer.call(via_tuple(key), :get, Keyword.get(opts, :timeout, :infinity)) + end @doc ~S""" Clears the result of a heavy computation, possibly by interrupting it if the @@ -88,26 +110,77 @@ defmodule Archethic.Utils.JobCache do ## Options * `:function` - function - * `:name` - if present, its value is passed to `GenServer.start_link/3` + * `:name` - if present, register the process with given name + * `:name_key` - if present, register the process with given key to JobCacheRegistry ## Examples iex> #{__MODULE__}.start_link [] ** (ArgumentError) expected :function in options - iex> {:ok, _} = #{__MODULE__}.start_link [function: fn -> :ok end, name: #{__MODULE__}] - iex> #{__MODULE__}.get! + iex> {:ok, pid} = #{__MODULE__}.start_link [function: fn -> :ok end, name: #{__MODULE__}] + iex> #{__MODULE__}.get! pid :ok """ @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(opts) do + name = + case Keyword.get(opts, :name_key) do + nil -> + Keyword.get(opts, :name) + + key -> + via_tuple(key) + end + + Keyword.has_key?(opts, :function) || raise ArgumentError, "expected :function in options" + GenServer.start_link(__MODULE__, opts, name: name) + end + + @spec start(Keyword.t()) :: GenServer.on_start() + def start(opts) do + name = + case Keyword.get(opts, :name_key) do + nil -> + Keyword.get(opts, :name) + + key -> + via_tuple(key) + end + Keyword.has_key?(opts, :function) || raise ArgumentError, "expected :function in options" - GenServer.start_link(__MODULE__, opts, Keyword.take(opts, [:name])) + GenServer.start(__MODULE__, opts, name: name) + end + + @spec stop(GenServer.server()) :: :ok + def stop(pid) when is_pid(pid) do + GenServer.stop(pid) + catch + :exit, _ -> :ok + end + + def stop(name) when is_atom(name) do + GenServer.stop(name) + catch + :exit, _ -> :ok + end + + def stop(key) do + GenServer.stop(via_tuple(key)) + catch + :exit, _ -> :ok end @impl GenServer def init(opts) do - {:ok, %S{function: Keyword.fetch!(opts, :function)}} + function = Keyword.fetch!(opts, :function) + immediate = Keyword.get(opts, :immediate, false) + + if immediate do + {:ok, %S{function: function, task: Task.async(function)}} + else + {:ok, %S{function: function}} + end end @impl GenServer @@ -123,6 +196,19 @@ defmodule Archethic.Utils.JobCache do {:noreply, %S{state | requests: [from | state.requests]}} end + def handle_cast({:get_async, from}, state = %S{result: nil, task: nil, requests: requests}) do + {:noreply, %S{state | task: Task.async(state.function), requests: [from | requests]}} + end + + def handle_cast({:get_async, from}, state = %S{result: {:ok, res}, task: nil}) do + GenServer.reply(from, res) + {:noreply, state} + end + + def handle_cast({:get_async, from}, state = %S{}) do + {:noreply, %S{state | requests: [from | state.requests]}} + end + @impl GenServer def handle_cast(:clear, state = %S{task: nil}) do {:noreply, %S{state | result: nil}} @@ -142,4 +228,8 @@ defmodule Archethic.Utils.JobCache do def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do {:noreply, state} end + + defp via_tuple(key) do + {:via, Registry, {JobCacheRegistry, key}} + end end diff --git a/test/archethic/beacon_chain/network_coordinates_test.exs b/test/archethic/beacon_chain/network_coordinates_test.exs index 5b527e985..9891830ba 100644 --- a/test/archethic/beacon_chain/network_coordinates_test.exs +++ b/test/archethic/beacon_chain/network_coordinates_test.exs @@ -45,7 +45,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do test "should retrieve the stats for a given summary time" do MockClient |> expect(:send_message, 3, fn - _, %GetNetworkStats{subsets: _}, _ -> + _, %GetNetworkStats{}, _ -> {:ok, %NetworkStats{ stats: %{ @@ -117,13 +117,13 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do MockClient |> expect(:send_message, 3, fn - ^wrong_node, %GetNetworkStats{subsets: _}, _ -> + ^wrong_node, %GetNetworkStats{}, _ -> {:ok, wrong_stats} - ^ok_node_1, %GetNetworkStats{subsets: _}, _ -> + ^ok_node_1, %GetNetworkStats{}, _ -> {:ok, ok_stats_1} - ^ok_node_2, %GetNetworkStats{subsets: _}, _ -> + ^ok_node_2, %GetNetworkStats{}, _ -> {:ok, ok_stats_2} end) diff --git a/test/archethic/beacon_chain/subset/stats_collector_test.exs b/test/archethic/beacon_chain/subset/stats_collector_test.exs new file mode 100644 index 000000000..74e12d37f --- /dev/null +++ b/test/archethic/beacon_chain/subset/stats_collector_test.exs @@ -0,0 +1,252 @@ +defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do + use ArchethicCase, async: false + + alias Archethic.BeaconChain + alias Archethic.BeaconChain.NetworkCoordinates + alias Archethic.BeaconChain.Subset.StatsCollector + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto + alias Archethic.Election + alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.PubSub + + import ArchethicCase + import Mock + + setup do + # global process to reset + Registry.select(Archethic.Utils.JobCacheRegistry, [{{:_, :"$1", :_}, [], [:"$1"]}]) + |> Enum.each(fn pid -> Process.exit(pid, :kill) end) + + {:ok, pid} = StatsCollector.start_link([]) + {:ok, _} = SummaryTimer.start_link([interval: "0 * * * * * *"], []) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-1, :day) + }) + + on_exit(fn -> Process.exit(pid, :kill) end) + + {:ok, %{pid: pid}} + end + + test "is subscribed to events", %{pid: pid} do + assert [:self_repair_sync, :next_summary_time] = Registry.keys(Archethic.PubSubRegistry, pid) + end + + test "should react to events" do + next_summary_time = DateTime.utc_now() + + with_mocks([ + {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {NetworkCoordinates, [], fetch_network_stats: fn _summary_time -> Nx.tensor(0) end} + ]) do + assert 0 = Registry.count(Archethic.Utils.JobCacheRegistry) + + send(StatsCollector, {:next_summary_time, next_summary_time}) + _ = :sys.get_state(StatsCollector) + + assert 2 = Registry.count(Archethic.Utils.JobCacheRegistry) + + send(StatsCollector, :self_repair_sync) + _ = :sys.get_state(StatsCollector) + # sleep a little because the JobCacheRegistry is informed asynchronously + # of the processes stopped + Process.sleep(50) + + assert 0 = Registry.count(Archethic.Utils.JobCacheRegistry) + end + end + + test "get/1 should return the stats of the subsets current node is elected to store" do + subset1 = :binary.encode_unsigned(0) + subset2 = :binary.encode_unsigned(1) + node1_public_key = random_public_key() + node2_public_key = random_public_key() + node3_public_key = random_public_key() + node4_public_key = random_public_key() + current_node = P2P.get_node_info() + + with_mocks([ + {BeaconChain, [:passthrough], + get_network_stats: fn + ^subset1 -> + %{ + node1_public_key => [%{latency: 1}], + node2_public_key => [%{latency: 1}] + } + + ^subset2 -> + %{ + node3_public_key => [%{latency: 10}], + node4_public_key => [%{latency: 10}] + } + end}, + {Election, [], + beacon_storage_nodes: fn + ^subset1, _, _ -> [current_node] + ^subset2, _, _ -> [current_node] + _, _, _ -> [] + end} + ]) do + summary_time = DateTime.utc_now() + PubSub.notify_next_summary_time(summary_time) + + assert %{ + ^subset1 => %{ + ^node1_public_key => [ + %{latency: 1} + ], + ^node2_public_key => [ + %{latency: 1} + ] + }, + ^subset2 => %{ + ^node3_public_key => [ + %{latency: 10} + ], + ^node4_public_key => [ + %{latency: 10} + ] + } + } = StatsCollector.get(summary_time) + end + end + + test "get/1 should start the job and reply if requested before the event happens" do + subset1 = :binary.encode_unsigned(0) + subset2 = :binary.encode_unsigned(1) + node1_public_key = random_public_key() + node2_public_key = random_public_key() + node3_public_key = random_public_key() + node4_public_key = random_public_key() + current_node = P2P.get_node_info() + + assert 0 = Registry.count(Archethic.Utils.JobCacheRegistry) + + with_mocks([ + {BeaconChain, [:passthrough], + get_network_stats: fn + ^subset1 -> + %{ + node1_public_key => [%{latency: 1}], + node2_public_key => [%{latency: 1}] + } + + ^subset2 -> + %{ + node3_public_key => [%{latency: 10}], + node4_public_key => [%{latency: 10}] + } + end}, + {Election, [], + beacon_storage_nodes: fn + ^subset1, _, _ -> [current_node] + ^subset2, _, _ -> [current_node] + _, _, _ -> [] + end} + ]) do + assert %{ + ^subset1 => %{ + ^node1_public_key => [ + %{latency: 1} + ], + ^node2_public_key => [ + %{latency: 1} + ] + }, + ^subset2 => %{ + ^node3_public_key => [ + %{latency: 10} + ], + ^node4_public_key => [ + %{latency: 10} + ] + } + } = StatsCollector.get(DateTime.utc_now()) + end + + assert 1 = Registry.count(Archethic.Utils.JobCacheRegistry) + end + + test "fetch/1 should return the stats of all subsets" do + subset1 = :binary.encode_unsigned(0) + subset2 = :binary.encode_unsigned(1) + current_node = P2P.get_node_info() + + tensor = + Nx.tensor([ + [0, 0, 36, 67, 45, 64, 0, 176, 43, 63, 190, 44, 75, 0, 146], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [36, 0, 0, 44, 63, 38, 0, 176, 47, 76, 167, 50, 65, 0, 142], + [67, 0, 44, 0, 47, 75, 0, 169, 52, 70, 186, 58, 70, 0, 159], + [45, 0, 63, 47, 0, 51, 0, 182, 53, 83, 187, 58, 107, 0, 142], + [64, 0, 38, 75, 51, 0, 0, 178, 46, 48, 193, 80, 72, 0, 149], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [176, 0, 176, 169, 182, 178, 0, 0, 151, 162, 196, 191, 143, 0, 195], + [43, 0, 47, 52, 53, 46, 0, 151, 0, 182, 166, 115, 91, 0, 109], + [63, 0, 76, 70, 83, 48, 0, 162, 182, 0, 167, 105, 144, 0, 124], + [190, 0, 167, 186, 187, 193, 0, 196, 166, 167, 0, 182, 165, 0, 109], + [44, 0, 50, 58, 58, 80, 0, 191, 115, 105, 182, 0, 82, 0, 154], + [75, 0, 65, 70, 107, 72, 0, 143, 91, 144, 165, 82, 0, 0, 160], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [146, 0, 142, 159, 142, 149, 0, 195, 109, 124, 109, 154, 160, 0, 0] + ]) + + with_mocks([ + {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {NetworkCoordinates, [], + fetch_network_stats: fn _summary_time -> + tensor + end}, + {Election, [], + beacon_storage_nodes: fn + ^subset1, _, _ -> [current_node] + ^subset2, _, _ -> [current_node] + _, _, _ -> [] + end} + ]) do + summary_time = DateTime.utc_now() + PubSub.notify_next_summary_time(summary_time) + + assert ^tensor = StatsCollector.fetch(summary_time) + end + end + + test "fetch/1 should start the job and reply if requested before the event happens" do + subset1 = :binary.encode_unsigned(0) + subset2 = :binary.encode_unsigned(1) + current_node = P2P.get_node_info() + + assert 0 = Registry.count(Archethic.Utils.JobCacheRegistry) + + with_mocks([ + {BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end}, + {NetworkCoordinates, [], + fetch_network_stats: fn _summary_time -> + Nx.tensor(1) + end}, + {Election, [], + beacon_storage_nodes: fn + ^subset1, _, _ -> [current_node] + ^subset2, _, _ -> [current_node] + _, _, _ -> [] + end} + ]) do + # can't compare tensor directly so we compare serialization + expected = Nx.tensor(1) |> Nx.to_binary() + assert ^expected = StatsCollector.fetch(DateTime.utc_now()) |> Nx.to_binary() + end + + assert 1 = Registry.count(Archethic.Utils.JobCacheRegistry) + end +end diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index 0de0e35bb..9887b8761 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -183,4 +183,36 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do slots = SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() assert [{^slot, ^node_key}, {^slot2, ^node_key}] = slots end + + test "should cleanup as soon as selfrepair is triggered" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * *"], []) + {:ok, pid} = SummaryCache.start_link() + File.mkdir_p!(Utils.mut_dir()) + + now = DateTime.utc_now() + + node_key = Crypto.first_node_public_key() + subset = <<0>> + + slot_pre_summary = %Slot{ + slot_time: SummaryTimer.previous_summary(now), + subset: subset + } + + slot_post_summary = %Slot{ + slot_time: SummaryTimer.next_summary(now), + subset: subset + } + + SummaryCache.add_slot(subset, slot_pre_summary, node_key) + SummaryCache.add_slot(subset, slot_post_summary, node_key) + + send(pid, :self_repair_sync) + Process.sleep(50) + + assert [{^slot_post_summary, ^node_key}] = + subset + |> SummaryCache.stream_current_slots() + |> Enum.to_list() + end end diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index c8df3308e..72c675e22 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -236,7 +236,7 @@ defmodule Archethic.BeaconChain.SubsetTest do _, %Ping{}, _ -> {:ok, %Ok{}} - _, %GetNetworkStats{subsets: _}, _ -> + _, %GetNetworkStats{}, _ -> {:ok, %NetworkStats{ stats: %{ @@ -271,6 +271,9 @@ defmodule Archethic.BeaconChain.SubsetTest do MockDB |> expect(:write_beacon_summary, fn summary -> send(me, {:summary_stored, summary}) end) + # subset process is dependant of stats collector + send(Process.whereis(StatsCollector), {:next_summary_time, ~U[2023-07-11 02:00:00Z]}) + send(pid, {:current_epoch_of_slot_timer, slot_time}) assert_receive {:summary_stored, summary}, 2000 @@ -281,10 +284,6 @@ defmodule Archethic.BeaconChain.SubsetTest do transaction_attestations: [^attestation], network_patches: ["F7A", "78A"] } = summary - - Process.sleep(5) - - assert [] = SummaryCache.stream_current_slots(subset) |> Enum.to_list() end end diff --git a/test/archethic/p2p/message/get_network_stats_test.exs b/test/archethic/p2p/message/get_network_stats_test.exs index 117e80ea4..5c0c6cb49 100644 --- a/test/archethic/p2p/message/get_network_stats_test.exs +++ b/test/archethic/p2p/message/get_network_stats_test.exs @@ -4,4 +4,15 @@ defmodule Archethic.P2P.Message.GetNetworkStatsTest do alias Archethic.P2P.Message.GetNetworkStats doctest GetNetworkStats + + describe "serialize/deserialize" do + summary_time = DateTime.utc_now() |> DateTime.truncate(:second) + + msg = %GetNetworkStats{summary_time: summary_time} + + assert {^msg, <<>>} = + msg + |> GetNetworkStats.serialize() + |> GetNetworkStats.deserialize() + end end diff --git a/test/archethic/utils/job_cache_test.exs b/test/archethic/utils/job_cache_test.exs index 686bde3d7..0ad9c8409 100644 --- a/test/archethic/utils/job_cache_test.exs +++ b/test/archethic/utils/job_cache_test.exs @@ -4,4 +4,42 @@ defmodule Archethic.Utils.JobCacheTest do alias Archethic.Utils.JobCache doctest JobCache + + test "should exit if no process" do + pid = spawn(fn -> :ok end) + assert {:normal, _} = catch_exit(JobCache.get!(pid)) + end + + test "should not immediately start the job by default" do + :persistent_term.put(:value, 1) + + {:ok, pid} = JobCache.start_link(function: fn -> :persistent_term.get(:value) end) + + :persistent_term.put(:value, 2) + + assert 2 = JobCache.get!(pid) + end + + test "should immediately start the job if :immediate flag is passed " do + :persistent_term.put(:value, 1) + + {:ok, pid} = + JobCache.start_link(immediate: true, function: fn -> :persistent_term.get(:value) end) + + :persistent_term.put(:value, 2) + + assert 1 = JobCache.get!(pid) + end + + test "should be able to start when using get! if there is no process yet" do + :persistent_term.put(:value, 1) + + assert 1 = JobCache.get!(:name, function: fn -> :persistent_term.get(:value) end) + assert 1 = JobCache.get!({:some, :key}, function: fn -> :persistent_term.get(:value) end) + + :persistent_term.put(:value, 2) + + assert 1 = JobCache.get!(:name) + assert 1 = JobCache.get!({:some, :key}) + end end