Skip to content

Commit

Permalink
Handle slot and summary in task
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Feb 3, 2025
1 parent f5f18e3 commit 72b0e91
Showing 1 changed file with 42 additions and 37 deletions.
79 changes: 42 additions & 37 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ defmodule Archethic.BeaconChain.Subset do
)
when time == slot_time do
if P2P.authorized_and_available_node?(node_public_key, time, true) do
handle_slot(current_slot, node_public_key)

if summary_time?(time) and beacon_summary_node?(subset, time, node_public_key) do
handle_summary(time, subset)
end
Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
handle_slot(current_slot, node_public_key)
handle_summary(time, subset, node_public_key)
end)
end

{:noreply, next_state(state, time)}
Expand Down Expand Up @@ -323,45 +322,51 @@ defmodule Archethic.BeaconChain.Subset do
|> P2P.broadcast_message(%NewBeaconSlot{slot: slot})
end

defp handle_summary(time, subset) do
beacon_slots =
SummaryCache.stream_slots(time, subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Enum.to_list()
defp handle_summary(time, subset, node_public_key) do
if summary_time?(time) and beacon_summary_node?(subset, time, node_public_key) do
beacon_slots =
time
|> SummaryCache.stream_slots(subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Enum.to_list()

if Enum.empty?(beacon_slots) do
:ok
else
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))
network_patches_timeout = NetworkCoordinates.timeout()
if not Enum.empty?(beacon_slots) do
create_beacon_summary(subset, time, beacon_slots)
end
end
end

patch_task =
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
get_network_patches(time, subset, network_patches_timeout)
end)
defp create_beacon_summary(subset, time, beacon_slots) do
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))
network_patches_timeout = NetworkCoordinates.timeout()

summary =
%Summary{subset: subset, summary_time: time}
|> Summary.aggregate_slots(
beacon_slots,
P2PSampling.list_nodes_to_sample(subset)
)
patch_task =
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
get_network_patches(time, subset, network_patches_timeout)
end)

network_patches =
case Task.yield(patch_task, network_patches_timeout) || Task.shutdown(patch_task) do
{:ok, network_patches} ->
network_patches
summary =
%Summary{subset: subset, summary_time: time}
|> Summary.aggregate_slots(
beacon_slots,
P2PSampling.list_nodes_to_sample(subset)
)

_ ->
Logger.warning("Build network patches takes more than #{network_patches_timeout} ms",
beacon_subset: Base.encode16(subset)
)
network_patches =
case Task.yield(patch_task, network_patches_timeout) || Task.shutdown(patch_task) do
{:ok, network_patches} ->
network_patches

[]
end
_ ->
Logger.warning(
"Build network patches takes more than #{network_patches_timeout} ms",
beacon_subset: Base.encode16(subset)
)

:ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})
end
[]
end

:ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})
end

defp get_network_patches(summary_time, subset, timeout) do
Expand Down

0 comments on commit 72b0e91

Please sign in to comment.