Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same timeout on all tasks related to network patches #1376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
alias Archethic.P2P.Message.GetNetworkStats
alias Archethic.P2P.Message.NetworkStats

alias Archethic.SelfRepair
alias Archethic.Utils

alias Archethic.TaskSupervisor

@doc """
Return the timeout to determine network patches
It is equivalent to 4m30s in production. 4.5s in dev.
It must be called only when creating the beacon summary
"""
def timeout() do
SelfRepair.next_repair_time()
|> DateTime.diff(DateTime.utc_now())
# We take 10% of the next repair time to determine the timeout
|> Kernel.*(0.9)
|> Kernel.*(1000)
|> round()
end

@doc """
Compute the network patch based on the matrix latencies

Expand Down Expand Up @@ -224,8 +239,8 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
This requests all the beacon nodes their aggregated network stats.
A NxN latency matrix is then computed based on the network stats origins and targets
"""
@spec fetch_network_stats(DateTime.t()) :: Nx.Tensor.t()
def fetch_network_stats(summary_time = %DateTime{}) do
@spec fetch_network_stats(DateTime.t(), pos_integer()) :: Nx.Tensor.t()
def fetch_network_stats(summary_time = %DateTime{}, timeout) do
authorized_nodes = P2P.authorized_and_available_nodes(summary_time, true)

sorted_node_list = P2P.list_nodes() |> Enum.sort_by(& &1.first_public_key)
Expand All @@ -234,7 +249,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

stream_network_stats(summary_time, beacon_nodes)
stream_network_stats(summary_time, beacon_nodes, timeout)
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)
Expand All @@ -250,14 +265,14 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
|> MapSet.to_list()
end

defp stream_network_stats(summary_time, beacon_nodes) do
defp stream_network_stats(summary_time, beacon_nodes, timeout) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
beacon_nodes,
fn node ->
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, 5_000)
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, timeout)
end,
timeout: 6_000,
timeout: timeout + 1_000,
ordered: false,
on_timeout: :kill_task,
max_concurrency: 256
Expand Down
20 changes: 6 additions & 14 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ defmodule Archethic.BeaconChain.Subset do
alias Archethic.P2P.Message.ReplicationAttestationMessage

alias Archethic.PubSub

alias Archethic.SelfRepair

alias Archethic.TaskSupervisor
alias Archethic.TransactionChain.TransactionSummary

Expand Down Expand Up @@ -387,9 +384,12 @@ defmodule Archethic.BeaconChain.Subset do
:ok
else
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))
network_patches_timeout = NetworkCoordinates.timeout()

patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(time, subset) end)
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
get_network_patches(time, subset, network_patches_timeout)
end)

summary =
%Summary{subset: subset, summary_time: time}
Expand All @@ -398,14 +398,6 @@ defmodule Archethic.BeaconChain.Subset do
P2PSampling.list_nodes_to_sample(subset)
)

network_patches_timeout =
SelfRepair.next_repair_time()
|> DateTime.diff(DateTime.utc_now())
# We take 10% of the next repair time to determine the timeout
|> Kernel.*(0.9)
|> Kernel.*(1000)
|> round()

network_patches =
case Task.yield(patch_task, network_patches_timeout) || Task.shutdown(patch_task) do
{:ok, network_patches} ->
Expand All @@ -423,7 +415,7 @@ defmodule Archethic.BeaconChain.Subset do
end
end

defp get_network_patches(summary_time, subset) do
defp get_network_patches(summary_time, subset, timeout) do
with true <- length(P2P.authorized_and_available_nodes()) > 1,
sampling_nodes when sampling_nodes != [] <- P2PSampling.list_nodes_to_sample(subset) do
sampling_nodes_indexes =
Expand All @@ -435,7 +427,7 @@ defmodule Archethic.BeaconChain.Subset do
end)
|> Enum.map(fn {_, index} -> index end)

StatsCollector.fetch(summary_time)
StatsCollector.fetch(summary_time, timeout)
|> NetworkCoordinates.get_patch_from_latencies()
|> Enum.with_index()
|> Enum.filter(fn {_, index} ->
Expand Down
23 changes: 10 additions & 13 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
"""

@vsn 1
@timeout :archethic
|> Application.compile_env(__MODULE__, [])
|> Keyword.get(:timeout, :timer.minutes(1))

use GenServer

Expand Down Expand Up @@ -43,12 +40,12 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
Get the local stats if current node is beacon storage node
"""
@spec get(DateTime.t(), pos_integer()) :: %{binary() => Nx.Tensor.t()}
def get(summary_time, timeout \\ @timeout) do
def get(summary_time, timeout) do
JobCache.get!(
{:get, summary_time},
function: fn ->
get_current_node_subsets(summary_time)
|> do_get_stats()
|> do_get_stats(timeout)
end,
timeout: timeout
)
Expand All @@ -60,10 +57,10 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
Fetch the stats of given summary from beacon_nodes
"""
@spec fetch(DateTime.t(), pos_integer()) :: Nx.Tensor.t()
def fetch(summary_time, timeout \\ @timeout) do
def fetch(summary_time, timeout) do
JobCache.get!(
{:fetch, summary_time},
function: fn -> do_fetch_stats(summary_time) end,
function: fn -> do_fetch_stats(summary_time, timeout) end,
timeout: timeout
)
catch
Expand Down Expand Up @@ -122,10 +119,10 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
function: fn ->
case action do
:get ->
do_get_stats(subsets)
do_get_stats(subsets, NetworkCoordinates.timeout())

:fetch ->
do_fetch_stats(summary_time)
do_fetch_stats(summary_time, NetworkCoordinates.timeout())
end
end
)
Expand All @@ -136,15 +133,15 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.stop(key)
end

defp do_get_stats(subsets) do
defp do_get_stats(subsets, timeout) do
subsets
|> Task.async_stream(
fn subset ->
stats = BeaconChain.get_network_stats(subset)

{subset, stats}
end,
timeout: 10_000,
timeout: timeout,
on_timeout: :kill_task,
ordered: false,
max_concurrency: 256
Expand All @@ -164,9 +161,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
end)
end

defp do_fetch_stats(summary_time) do
defp do_fetch_stats(summary_time, timeout) do
start_time = System.monotonic_time()
stats = NetworkCoordinates.fetch_network_stats(summary_time)
stats = NetworkCoordinates.fetch_network_stats(summary_time, timeout)

:telemetry.execute(
[:archethic, :beacon_chain, :network_coordinates, :collect_stats],
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/p2p/message/get_network_stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Archethic.P2P.Message.GetNetworkStats do
@enforce_keys [:summary_time]
defstruct [:summary_time]

alias Archethic.BeaconChain.NetworkCoordinates
alias Archethic.BeaconChain.Subset.StatsCollector
alias Archethic.Crypto
alias Archethic.P2P.Message.NetworkStats
Expand Down Expand Up @@ -40,6 +41,6 @@ defmodule Archethic.P2P.Message.GetNetworkStats do
"""
@spec process(t(), Crypto.key()) :: NetworkStats.t()
def process(%__MODULE__{summary_time: summary_time}, _node_public_key) do
%NetworkStats{stats: StatsCollector.get(summary_time)}
%NetworkStats{stats: StatsCollector.get(summary_time, NetworkCoordinates.timeout())}
end
end
9 changes: 7 additions & 2 deletions test/archethic/beacon_chain/network_coordinates_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do
alias Archethic.P2P.Node

doctest NetworkCoordinates
@timeout 1_000

import Mox

Expand Down Expand Up @@ -77,7 +78,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do
[100, 110, 90, 0, 0, 0],
[100, 105, 90, 0, 0, 0],
[90, 105, 90, 0, 0, 0]
] == NetworkCoordinates.fetch_network_stats(DateTime.utc_now()) |> Nx.to_list()
] ==
NetworkCoordinates.fetch_network_stats(DateTime.utc_now(), @timeout)
|> Nx.to_list()
end

test "should filter stats that are different from expected nodes for a subset" do
Expand Down Expand Up @@ -134,7 +137,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do
[150, 150, 150, 0, 0, 0],
[150, 150, 150, 0, 0, 0],
[150, 150, 150, 0, 0, 0]
] == NetworkCoordinates.fetch_network_stats(DateTime.utc_now()) |> Nx.to_list()
] ==
NetworkCoordinates.fetch_network_stats(DateTime.utc_now(), @timeout)
|> Nx.to_list()
end
end
end
32 changes: 20 additions & 12 deletions test/archethic/beacon_chain/subset/stats_collector_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
import ArchethicCase
import Mock

setup do
# global process to reset
Registry.select(Archethic.Utils.JobCacheRegistry, [{{:_, :"$1", :_}, [], [:"$1"]}])
|> Enum.each(fn pid -> Process.exit(pid, :kill) end)
@timeout 1_000

setup do
{:ok, pid} = StatsCollector.start_link([])
{:ok, _} = SummaryTimer.start_link([interval: "0 * * * * * *"], [])

Expand All @@ -34,7 +32,14 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
authorization_date: DateTime.utc_now() |> DateTime.add(-1, :day)
})

on_exit(fn -> Process.exit(pid, :kill) end)
on_exit(fn ->
# global process to reset
Registry.select(Archethic.Utils.JobCacheRegistry, [{{:_, :"$1", :_}, [], [:"$1"]}])
|> Enum.each(fn pid -> Process.exit(pid, :kill) end)

# sleep a little because the JobCacheRegistry is informed asynchronously
Process.sleep(50)
end)

{:ok, %{pid: pid}}
end
Expand All @@ -48,7 +53,8 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do

with_mocks([
{BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end},
{NetworkCoordinates, [], fetch_network_stats: fn _summary_time -> Nx.tensor(0) end}
{NetworkCoordinates, [],
timeout: fn -> @timeout end, fetch_network_stats: fn _summary_time, _ -> Nx.tensor(0) end}
]) do
assert 0 = Registry.count(Archethic.Utils.JobCacheRegistry)

Expand Down Expand Up @@ -118,7 +124,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
%{latency: 10}
]
}
} = StatsCollector.get(summary_time)
} = StatsCollector.get(summary_time, @timeout)
end
end

Expand Down Expand Up @@ -172,7 +178,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
%{latency: 10}
]
}
} = StatsCollector.get(DateTime.utc_now())
} = StatsCollector.get(DateTime.utc_now(), @timeout)
end

assert 1 = Registry.count(Archethic.Utils.JobCacheRegistry)
Expand Down Expand Up @@ -205,7 +211,8 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
with_mocks([
{BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end},
{NetworkCoordinates, [],
fetch_network_stats: fn _summary_time ->
timeout: fn -> @timeout end,
fetch_network_stats: fn _summary_time, _ ->
tensor
end},
{Election, [],
Expand All @@ -218,7 +225,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
summary_time = DateTime.utc_now()
PubSub.notify_next_summary_time(summary_time)

assert ^tensor = StatsCollector.fetch(summary_time)
assert ^tensor = StatsCollector.fetch(summary_time, @timeout)
end
end

Expand All @@ -232,7 +239,8 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
with_mocks([
{BeaconChain, [:passthrough], get_network_stats: fn _ -> %{} end},
{NetworkCoordinates, [],
fetch_network_stats: fn _summary_time ->
timeout: fn -> @timeout end,
fetch_network_stats: fn _summary_time, _ ->
Nx.tensor(1)
end},
{Election, [],
Expand All @@ -244,7 +252,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollectorTest do
]) do
# can't compare tensor directly so we compare serialization
expected = Nx.tensor(1) |> Nx.to_binary()
assert ^expected = StatsCollector.fetch(DateTime.utc_now()) |> Nx.to_binary()
assert ^expected = StatsCollector.fetch(DateTime.utc_now(), @timeout) |> Nx.to_binary()
end

assert 1 = Registry.count(Archethic.Utils.JobCacheRegistry)
Expand Down
Loading