Skip to content

Commit

Permalink
Resolve missing beacon network stats
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Apr 3, 2023
1 parent df06bfc commit ecc8ae7
Showing 1 changed file with 31 additions and 53 deletions.
84 changes: 31 additions & 53 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
matrix
|> Nx.as_type(:f64)
|> Nx.rename([:line, :column])
|> IO.inspect()

center_mass = compute_distance_from_center_mass(formated_matrix)
gram_matrix = get_gram_matrix(formated_matrix, center_mass)
{x, y} = get_coordinates(gram_matrix)
get_patch_digits(x, y) |> IO.inspect(label: "after matrix compute")
get_patch_digits(x, y)
else
[]
end
Expand Down Expand Up @@ -222,46 +221,24 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

%{bounded: bounded_subsets, unbounded: unbounded_subsets} =
get_subsets_bounding(summary_time, authorized_nodes)

bounded_netstats =
bounded_subsets
|> Task.async_stream(&{&1, aggregate_network_stats(&1)})
|> Enum.reduce(%{}, fn
{:ok, {subset, stats}}, acc when map_size(stats) > 0 ->
Map.put(acc, subset, stats)

_, acc ->
acc
end)

unbounded_matrix =
unbounded_subsets
# Aggregate subsets by node
|> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc ->
Enum.reduce(beacon_nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
summary_time
|> get_subsets_nodes(authorized_nodes)
# Aggregate subsets by node
|> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc ->
Enum.reduce(beacon_nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
|> stream_subsets_stats()
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)

full_matrix = update_matrix_from_stats(bounded_netstats, unbounded_matrix, sorted_node_list)
full_matrix
end)
|> stream_subsets_stats()
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)
end

defp get_subsets_bounding(summary_time, authorized_nodes) do
Enum.reduce(BeaconChain.list_subsets(), %{bounded: [], unbounded: []}, fn subset, acc ->
defp get_subsets_nodes(summary_time, authorized_nodes) do
Enum.map(BeaconChain.list_subsets(), fn subset ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)

if Utils.key_in_node_list?(beacon_nodes, Crypto.first_node_public_key()) do
Map.update!(acc, :bounded, &[subset | &1])
else
Map.update!(acc, :unbounded, &[{subset, beacon_nodes} | &1])
end
{subset, beacon_nodes}
end)
end

Expand Down Expand Up @@ -302,27 +279,28 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
|> Enum.reduce(%{}, fn {subset, stats_by_node}, acc ->
aggregated_stats_by_node =
Enum.reduce(stats_by_node, %{}, fn {node, stats}, acc ->
aggregated_stats =
stats
|> Enum.zip()
|> Enum.map(fn stats ->
latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
|> Utils.mean()
|> trunc()

%{latency: latency}
end)

Map.put(acc, node, aggregated_stats)
Map.put(acc, node, aggregate_stats(stats))
end)

Map.put(acc, subset, aggregated_stats_by_node)
end)
end

defp aggregate_stats(stats) do
stats
|> Enum.zip()
|> Enum.map(fn stats ->
latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
|> Utils.mean()
|> trunc()

%{latency: latency}
end)
end

defp update_matrix_from_stats(stats_by_subset, matrix, sorted_node_list) do
Enum.reduce(stats_by_subset, matrix, fn {subset, stats}, acc ->
sampling_nodes = P2PSampling.list_nodes_to_sample(subset)
Expand Down

0 comments on commit ecc8ae7

Please sign in to comment.