Skip to content

Commit

Permalink
GetNetworkStats improvment: timeout & caching (#1327)
Browse files Browse the repository at this point in the history
* Set timeout to 5s on the GetNetworkStats msg

* Properly handle timeout & calculate all the subsets at the same time

* GetNetworkStats use a JobCache

* task have a longer timeout

* add :immediate flag to JobCache

* Refactor the statsCollector

- subscribe to events to start/stop job cache

* statscollector may start the jobs on first request if that happens before the event

* Simplify the stats collector by removing the state

* JobCache.get! now takes a keyword list

- it now use a registry
- it might start the process if not already started
  • Loading branch information
bchamagne authored Dec 26, 2023
1 parent 0995b46 commit d701cbd
Show file tree
Hide file tree
Showing 17 changed files with 674 additions and 158 deletions.
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ config :archethic, Archethic.BeaconChain.SummaryTimer,
interval: "0 * * * * *"

config :archethic, Archethic.BeaconChain.Subset.SummaryCache, enabled: false
config :archethic, Archethic.BeaconChain.Subset.StatsCollector, enabled: false

config :archethic, Archethic.Bootstrap, enabled: false

Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ defmodule Archethic.Application do
MetricSupervisor,

# a registry used in Utils to ensure a function is executed at most once concurrently
{Registry, keys: :unique, name: Archethic.RunExclusiveRegistry}
{Registry, keys: :unique, name: Archethic.RunExclusiveRegistry},
{Registry, keys: :unique, name: Archethic.Utils.JobCacheRegistry}
]

opts = [strategy: :rest_for_one, name: Archethic.Supervisor]
Expand Down
35 changes: 16 additions & 19 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
defp get_digit(acc, _, _, _, _, _), do: acc

@doc """
Fetch remotely the network stats for a given summary time
Fetch remotely the network stats for a given summary time
This request all the subsets to gather the aggregated network stats.
This requests all the beacon nodes their aggregated network stats.
A NxN latency matrix is then computed based on the network stats origins and targets
"""
@spec fetch_network_stats(DateTime.t()) :: Nx.Tensor.t()
Expand All @@ -230,37 +230,34 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

sorted_node_list = P2P.list_nodes() |> Enum.sort_by(& &1.first_public_key)
nb_nodes = length(sorted_node_list)
beacon_nodes = get_beacon_nodes(summary_time, authorized_nodes)

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

summary_time
|> get_subsets_nodes(authorized_nodes)
# Aggregate subsets by node
|> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc ->
Enum.reduce(beacon_nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
end)
|> stream_subsets_stats()
stream_network_stats(summary_time, beacon_nodes)
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)
end

defp get_subsets_nodes(summary_time, authorized_nodes) do
Enum.map(BeaconChain.list_subsets(), fn subset ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)
{subset, beacon_nodes}
defp get_beacon_nodes(summary_time, authorized_nodes) do
BeaconChain.list_subsets()
|> Enum.reduce(MapSet.new(), fn subset, acc ->
Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)
|> MapSet.new()
|> MapSet.union(acc)
end)
|> MapSet.to_list()
end

defp stream_subsets_stats(subsets_by_node) do
defp stream_network_stats(summary_time, beacon_nodes) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
subsets_by_node,
fn {node, subsets} ->
P2P.send_message(node, %GetNetworkStats{subsets: subsets})
beacon_nodes,
fn node ->
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, 5_000)
end,
timeout: 6_000,
ordered: false,
on_timeout: :kill_task,
max_concurrency: 256
Expand Down
9 changes: 3 additions & 6 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ defmodule Archethic.BeaconChain.Subset do
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))

patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end)
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(time, subset) end)

summary =
%Summary{subset: subset, summary_time: time}
Expand Down Expand Up @@ -420,12 +420,10 @@ defmodule Archethic.BeaconChain.Subset do
end

:ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})

SummaryCache.clean_previous_summary_slots(subset, time)
end
end

defp get_network_patches(subset, summary_time) do
defp get_network_patches(summary_time, subset) do
with true <- length(P2P.authorized_and_available_nodes()) > 1,
sampling_nodes when sampling_nodes != [] <- P2PSampling.list_nodes_to_sample(subset) do
sampling_nodes_indexes =
Expand All @@ -437,8 +435,7 @@ defmodule Archethic.BeaconChain.Subset do
end)
|> Enum.map(fn {_, index} -> index end)

summary_time
|> StatsCollector.get()
StatsCollector.fetch(summary_time)
|> NetworkCoordinates.get_patch_from_latencies()
|> Enum.with_index()
|> Enum.filter(fn {_, index} ->
Expand Down
204 changes: 159 additions & 45 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
@@ -1,65 +1,173 @@
defmodule Archethic.BeaconChain.Subset.StatsCollector do
@moduledoc """
Process responsible to collect subset network stats
and reply to parallels requests to reduce the network load.
Get the networks stats locally and remotely
Uses 2 job caches:
- cache_get: cache the aggregation of local stats
- cache_fetch: cache the I/O to fetch remote stats
It subscribes to 2 events to start and stop both jobs ASAP
Jobs are also started by the get/fetch function if needed
"""

@vsn Mix.Project.config()[:version]
@timeout :archethic
|> Application.compile_env(__MODULE__, [])
|> Keyword.get(:timeout, :timer.minutes(1))

use GenServer

alias Archethic.P2P
alias Archethic.Election
alias Archethic.BeaconChain
alias Archethic.BeaconChain.NetworkCoordinates
alias Archethic.TaskSupervisor
alias Archethic.Utils.JobCache
alias Archethic.PubSub

require Logger

def start_link(_arg \\ []) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
# ------------------------------------------------------------
# _ ____ ___
# / \ | _ |_ _|
# / _ \ | |_) | |
# / ___ \| __/| |
# /_/ \_|_| |___|
# ------------------------------------------------------------

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@spec get(DateTime.t()) :: Nx.Tensor.t()
def get(summary_time) do
try do
GenServer.call(__MODULE__, {:get, summary_time})
catch
:exit, {:timeout, _} ->
Logger.warning("Fetching network stats take longer than 5s")
Nx.tensor(0)
end
@doc """
Get the local stats if current node is beacon storage node
"""
@spec get(DateTime.t(), pos_integer()) :: %{binary() => Nx.Tensor.t()}
def get(summary_time, timeout \\ @timeout) do
JobCache.get!(
{:get, summary_time},
function: fn ->
get_current_node_subsets(summary_time)
|> do_get_stats()
end,
timeout: timeout
)
catch
:exit, _ -> %{}
end

@doc """
Fetch the stats of given summary from beacon_nodes
"""
@spec fetch(DateTime.t(), pos_integer()) :: Nx.Tensor.t()
def fetch(summary_time, timeout \\ @timeout) do
JobCache.get!(
{:fetch, summary_time},
function: fn -> do_fetch_stats(summary_time) end,
timeout: timeout
)
catch
:exit, _ -> Nx.tensor(0)
end

# ------------------------------------------------------------
# _ _ _ _
# ___ __ _| | | |__ __ _ ___| | _____
# / __/ _` | | | '_ \ / _` |/ __| |/ / __|
# | (_| (_| | | | |_) | (_| | (__| <\__ \
# \___\__,_|_|_|_.__/ \__,_|\___|_|\_|___/
# ------------------------------------------------------------
def init(_) do
{:ok, %{fetching_task: nil, clients: []}}
PubSub.register_to_next_summary_time()
PubSub.register_to_self_repair()
{:ok, :no_state}
end

def handle_call({:get, summary_time}, from, state = %{fetching_task: nil}) do
task =
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
start_time = System.monotonic_time()
stats = NetworkCoordinates.fetch_network_stats(summary_time)
{stats, start_time}
end)
def handle_info({:next_summary_time, next_summary_time}, state) do
summary_time = BeaconChain.previous_summary_time(next_summary_time)

new_state =
state
|> Map.update!(:clients, &[from | &1])
|> Map.put(:fetching_task, task)
maybe_start_job({:get, summary_time})
maybe_start_job({:fetch, summary_time})

{:noreply, new_state}
{:noreply, state}
end

def handle_call({:get, _summary_time}, from, state = %{fetching_task: _}) do
new_state =
state
|> Map.update!(:clients, &[from | &1])
def handle_info(:self_repair_sync, state) do
summary_time = BeaconChain.previous_summary_time(DateTime.utc_now())

maybe_stop_job({:get, summary_time})
maybe_stop_job({:fetch, summary_time})

{:noreply, new_state}
{:noreply, state}
end

def handle_info(
{ref, {stats, start_time}},
state = %{clients: clients, fetching_task: %Task{ref: ref_task}}
)
when ref_task == ref do
# ------------------------------------------------------------
# _ _
# _ __ _ __(___ ____ _| |_ ___
# | '_ \| '__| \ \ / / _` | __/ _ \
# | |_) | | | |\ V | (_| | || __/
# | .__/|_| |_| \_/ \__,_|\__\___|
# |_|
# ------------------------------------------------------------

defp maybe_start_job({action, summary_time}) do
case get_current_node_subsets(summary_time) do
[] ->
nil

subsets ->
JobCache.start(
immediate: true,
name_key: {action, summary_time},
function: fn ->
case action do
:get ->
do_get_stats(subsets)

:fetch ->
do_fetch_stats(summary_time)
end
end
)
end
end

defp maybe_stop_job(key) do
JobCache.stop(key)
end

defp do_get_stats(subsets) do
subsets
|> Task.async_stream(
fn subset ->
stats = BeaconChain.get_network_stats(subset)

{subset, stats}
end,
timeout: 10_000,
on_timeout: :kill_task,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(fn
{:exit, :timeout} -> false
_ -> true
end)
|> Stream.map(fn {:ok, res} -> res end)
|> Enum.to_list()
|> Enum.reduce(%{}, fn
{subset, stats}, acc when map_size(stats) > 0 ->
Map.put(acc, subset, stats)

_, acc ->
acc
end)
end

defp do_fetch_stats(summary_time) do
start_time = System.monotonic_time()
stats = NetworkCoordinates.fetch_network_stats(summary_time)

:telemetry.execute(
[:archethic, :beacon_chain, :network_coordinates, :collect_stats],
%{
Expand All @@ -68,15 +176,21 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
%{matrix_size: Nx.size(stats)}
)

Enum.each(clients, &GenServer.reply(&1, stats))
stats
end

new_state =
state
|> Map.put(:clients, [])
|> Map.put(:fetching_task, nil)
defp get_current_node_subsets(summary_time) do
authorized_nodes = P2P.authorized_and_available_nodes(summary_time, true)
current_node = P2P.get_node_info()

{:noreply, new_state}
end
Enum.reduce(BeaconChain.list_subsets(), [], fn subset, acc ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)

def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state}
if current_node in beacon_nodes do
[subset | acc]
else
acc
end
end)
end
end
Loading

0 comments on commit d701cbd

Please sign in to comment.