Skip to content

Commit

Permalink
statscollector may start the jobs on first request if that happens be…
Browse files Browse the repository at this point in the history
…fore the event
  • Loading branch information
bchamagne committed Dec 22, 2023
1 parent 0cb7633 commit 7b76737
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 133 deletions.
34 changes: 15 additions & 19 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
defp get_digit(acc, _, _, _, _, _), do: acc

@doc """
Fetch remotely the network stats for a given summary time
Fetch remotely the network stats for a given summary time
This request all the subsets to gather the aggregated network stats.
This requests all the beacon nodes their aggregated network stats.
A NxN latency matrix is then computed based on the network stats origins and targets
"""
@spec fetch_network_stats(DateTime.t()) :: Nx.Tensor.t()
Expand All @@ -230,36 +230,32 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

sorted_node_list = P2P.list_nodes() |> Enum.sort_by(& &1.first_public_key)
nb_nodes = length(sorted_node_list)
beacon_nodes = get_beacon_nodes(summary_time, authorized_nodes)

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

summary_time
|> get_subsets_nodes(authorized_nodes)
# Aggregate subsets by node
|> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc ->
Enum.reduce(beacon_nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
end)
|> stream_subsets_stats()
stream_network_stats(summary_time, beacon_nodes)
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)
end

defp get_subsets_nodes(summary_time, authorized_nodes) do
Enum.map(BeaconChain.list_subsets(), fn subset ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)
{subset, beacon_nodes}
defp get_beacon_nodes(summary_time, authorized_nodes) do
BeaconChain.list_subsets()
|> Enum.reduce(MapSet.new(), fn subset, acc ->
Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)
|> MapSet.new()
|> MapSet.union(acc)
end)
|> MapSet.to_list()
end

defp stream_subsets_stats(subsets_by_node) do
defp stream_network_stats(summary_time, beacon_nodes) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
subsets_by_node,
fn {node, subsets} ->
P2P.send_message(node, %GetNetworkStats{subsets: subsets}, 5_000)
beacon_nodes,
fn node ->
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, 5_000)
end,
timeout: 6_000,
ordered: false,
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ defmodule Archethic.BeaconChain.Subset do
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))

patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset) end)
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(time, subset) end)

summary =
%Summary{subset: subset, summary_time: time}
Expand Down Expand Up @@ -428,7 +428,7 @@ defmodule Archethic.BeaconChain.Subset do
end
end

defp get_network_patches(subset) do
defp get_network_patches(summary_time, subset) do
with true <- length(P2P.authorized_and_available_nodes()) > 1,
sampling_nodes when sampling_nodes != [] <- P2PSampling.list_nodes_to_sample(subset) do
sampling_nodes_indexes =
Expand All @@ -440,7 +440,7 @@ defmodule Archethic.BeaconChain.Subset do
end)
|> Enum.map(fn {_, index} -> index end)

StatsCollector.fetch()
StatsCollector.fetch(summary_time)
|> NetworkCoordinates.get_patch_from_latencies()
|> Enum.with_index()
|> Enum.filter(fn {_, index} ->
Expand Down
139 changes: 77 additions & 62 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
Uses 2 job caches:
- cache_get: cache the aggregation of local stats
- cache_fetch: cache the I/O
- cache_fetch: cache the I/O to fetch remote stats
It subscribes to 2 events to start and stop both jobs ASAP
"""

@vsn Mix.Project.config()[:version]
@timeout :timer.minutes(1)
@timeout :archethic
|> Application.compile_env(__MODULE__, [])
|> Keyword.get(:timeout, :timer.minutes(1))

use GenServer

alias Archethic.P2P
Expand Down Expand Up @@ -36,19 +41,19 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
end

@doc """
Get the local stats for the subsets this node is elected
Get the local stats if current node is beacon storage node
"""
@spec get(pos_integer()) :: %{binary() => Nx.Tensor.t()}
def get(timeout \\ @timeout) do
GenServer.call(__MODULE__, :get, timeout)
@spec get(DateTime.t(), pos_integer()) :: %{binary() => Nx.Tensor.t()}
def get(summary_time, timeout \\ @timeout) do
GenServer.call(__MODULE__, {:get, summary_time}, timeout)
end

@doc """
Fetch the stats from all subsets
Fetch the stats of given summary from beacon_nodes
"""
@spec fetch(pos_integer()) :: Nx.Tensor.t()
def fetch(timeout \\ @timeout) do
GenServer.call(__MODULE__, :fetch, timeout)
@spec fetch(DateTime.t(), pos_integer()) :: Nx.Tensor.t()
def fetch(summary_time, timeout \\ @timeout) do
GenServer.call(__MODULE__, {:fetch, summary_time}, timeout)
end

# ------------------------------------------------------------
Expand All @@ -64,73 +69,49 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
{:ok, %__MODULE__{}}
end

def handle_call(:get, _from, state = %__MODULE__{cache_get: pid}) do
stats =
try do
JobCache.get!(pid, @timeout)
catch
:exit, _ ->
%{}
end
def handle_call({:get, summary_time}, _, state = %__MODULE__{cache_get: nil}) do
{cache_fetch_pid, cache_get_pid} = start_jobs(summary_time)
stats = get_stats_from_cache(cache_get_pid, %{})
{:reply, stats, %__MODULE__{state | cache_fetch: cache_fetch_pid, cache_get: cache_get_pid}}
end

def handle_call({:get, _}, _, state = %__MODULE__{cache_get: cache_get_pid}) do
stats = get_stats_from_cache(cache_get_pid, %{})
{:reply, stats, state}
end

def handle_call(:fetch, _from, state = %__MODULE__{cache_fetch: pid}) do
stats =
try do
JobCache.get!(pid, @timeout)
catch
:exit, _ ->
Nx.tensor(0)
end
def handle_call({:fetch, summary_time}, _, state = %__MODULE__{cache_fetch: nil}) do
{cache_fetch_pid, cache_get_pid} = start_jobs(summary_time)
stats = get_stats_from_cache(cache_fetch_pid, Nx.tensor(0))
{:reply, stats, %__MODULE__{state | cache_fetch: cache_fetch_pid, cache_get: cache_get_pid}}
end

def handle_call({:fetch, _}, _, state = %__MODULE__{cache_fetch: cache_fetch_pid}) do
stats = get_stats_from_cache(cache_fetch_pid, Nx.tensor(0))
{:reply, stats, state}
end

# When the summary happens, we fetch the stats
# and keep the result in a cache
def handle_info({:next_summary_time, next_summary_time}, state) do
summary_time = BeaconChain.previous_summary_time(next_summary_time)

# election of current node subsets
new_state =
case get_current_node_subsets(summary_time) do
[] ->
Logger.debug("Current node is elected to store 0 beacon subset")
state

subsets ->
Logger.debug("Current node is elected to store #{length(subsets)} beacon subsets")

{:ok, cache_fetch_pid} =
JobCache.start_link(immediate: true, function: fn -> do_fetch_stats(summary_time) end)

{:ok, cache_get_pid} =
JobCache.start_link(immediate: true, function: fn -> do_get_stats(subsets) end)
def handle_info(
{:next_summary_time, next_summary_time},
state = %__MODULE__{cache_fetch: nil, cache_get: nil}
) do
{cache_fetch_pid, cache_get_pid} =
BeaconChain.previous_summary_time(next_summary_time)
|> start_jobs()

%__MODULE__{state | cache_fetch: cache_fetch_pid, cache_get: cache_get_pid}
end
{:noreply, %__MODULE__{state | cache_fetch: cache_fetch_pid, cache_get: cache_get_pid}}
end

{:noreply, new_state}
# happens if the process receive a get or fetch before the event
def handle_info({:next_summary_time, _}, state) do
{:noreply, state}
end

# When a self repair happens, nobody will ask us for the stats anymore
# We can clear the caches
def handle_info(
:self_repair_sync,
state = %__MODULE__{
cache_fetch: cache_fetch_pid,
cache_get: cache_get_pid
}
state = %__MODULE__{cache_fetch: cache_fetch_pid, cache_get: cache_get_pid}
) do
if is_pid(cache_fetch_pid) do
JobCache.stop(cache_fetch_pid)
end

if is_pid(cache_get_pid) do
JobCache.stop(cache_get_pid)
end
stop_jobs(cache_fetch_pid, cache_get_pid)

{:noreply, %__MODULE__{state | cache_fetch: nil, cache_get: nil}}
end
Expand All @@ -143,6 +124,40 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
# | .__/|_| |_| \_/ \__,_|\__\___|
# |_|
# ------------------------------------------------------------
defp start_jobs(summary_time) do
case get_current_node_subsets(summary_time) do
[] ->
{nil, nil}

subsets ->
Logger.debug("Current node is elected to store #{length(subsets)} beacon subsets")

{:ok, cache_fetch_pid} =
JobCache.start_link(immediate: true, function: fn -> do_fetch_stats(summary_time) end)

{:ok, cache_get_pid} =
JobCache.start_link(immediate: true, function: fn -> do_get_stats(subsets) end)

{cache_fetch_pid, cache_get_pid}
end
end

defp stop_jobs(cache_fetch_pid, cache_get_pid) do
if is_pid(cache_fetch_pid) do
JobCache.stop(cache_fetch_pid)
end

if is_pid(cache_get_pid) do
JobCache.stop(cache_get_pid)
end
end

defp get_stats_from_cache(pid, fallback) do
JobCache.get!(pid, @timeout)
catch
:exit, _ -> fallback
end

defp do_get_stats(subsets) do
subsets
|> Task.async_stream(
Expand Down
45 changes: 12 additions & 33 deletions lib/archethic/p2p/message/get_network_stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,34 @@ defmodule Archethic.P2P.Message.GetNetworkStats do
Represents a message to get the network stats from the beacon summary cache
"""

@enforce_keys :subsets
defstruct subsets: []
@enforce_keys [:summary_time]
defstruct [:summary_time]

alias Archethic.BeaconChain.Subset.StatsCollector
alias Archethic.Crypto
alias Archethic.P2P.Message.NetworkStats

@type t :: %__MODULE__{
subsets: list(binary())
summary_time: DateTime.t()
}

@doc """
Serialize the get network stats message into binary
## Examples
iex> %GetNetworkStats{subsets: [<<0>>, <<255>>]} |> GetNetworkStats.serialize()
<<
# Length of subsets
0, 2,
# Subset
0, 255
>>
"""
def serialize(%__MODULE__{subsets: subsets}) do
<<length(subsets)::16, :erlang.list_to_binary(subsets)::binary>>
@spec serialize(t()) :: bitstring()
def serialize(%__MODULE__{summary_time: summary_time}) do
<<DateTime.to_unix(summary_time)::32>>
end

@doc """
Deserialize the binary into the get network stats message
## Examples
iex> <<0, 2, 0, 255>> |> GetNetworkStats.deserialize()
{
%GetNetworkStats{subsets: [<<0>>, <<255>>]},
""
}
"""
def deserialize(<<length::16, subsets_binary::binary-size(length), rest::bitstring>>) do
subsets =
subsets_binary
|> :erlang.binary_to_list()
|> Enum.map(&<<&1>>)
@spec deserialize(bitstring) :: {t(), bitstring()}
def deserialize(<<unix::32, rest::bitstring>>) do
summary_time = DateTime.from_unix!(unix)

{
%__MODULE__{subsets: subsets},
%__MODULE__{summary_time: summary_time},
rest
}
end
Expand All @@ -58,9 +39,7 @@ defmodule Archethic.P2P.Message.GetNetworkStats do
Process the message to get the network stats from the summary cache
"""
@spec process(t(), Crypto.key()) :: NetworkStats.t()
def process(%__MODULE__{}, _node_public_key) do
# we do not use the `subsets` argument anymore.
# the node will always reply with the stats from the subsets it is elected to store
%NetworkStats{stats: StatsCollector.get()}
def process(%__MODULE__{summary_time: summary_time}, _node_public_key) do
%NetworkStats{stats: StatsCollector.get(summary_time)}
end
end
8 changes: 4 additions & 4 deletions test/archethic/beacon_chain/network_coordinates_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do
test "should retrieve the stats for a given summary time" do
MockClient
|> expect(:send_message, 3, fn
_, %GetNetworkStats{subsets: _}, _ ->
_, %GetNetworkStats{}, _ ->
{:ok,
%NetworkStats{
stats: %{
Expand Down Expand Up @@ -117,13 +117,13 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do

MockClient
|> expect(:send_message, 3, fn
^wrong_node, %GetNetworkStats{subsets: _}, _ ->
^wrong_node, %GetNetworkStats{}, _ ->
{:ok, wrong_stats}

^ok_node_1, %GetNetworkStats{subsets: _}, _ ->
^ok_node_1, %GetNetworkStats{}, _ ->
{:ok, ok_stats_1}

^ok_node_2, %GetNetworkStats{subsets: _}, _ ->
^ok_node_2, %GetNetworkStats{}, _ ->
{:ok, ok_stats_2}
end)

Expand Down
Loading

0 comments on commit 7b76737

Please sign in to comment.