Skip to content

Commit

Permalink
Optimize self repair recipient consolidation
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Jan 13, 2025
1 parent befb96c commit 1ad610b
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -510,28 +510,12 @@ defmodule Archethic.SelfRepair.Sync do
%Transaction{validation_stamp: %ValidationStamp{recipients: recipients = [_ | _]}},
download_nodes
) do
resolved_addresses =
Enum.chunk_every(movements_addresses, 2) |> Enum.map(&List.to_tuple/1) |> Map.new()

consolidated_movements_addresses =
recipients
|> Task.async_stream(
fn recipient ->
genesis_nodes = Election.chain_storage_nodes(recipient, download_nodes)

case TransactionChain.fetch_genesis_address(recipient, genesis_nodes,
acceptance_resolver: :accept_different_genesis
) do
{:ok, genesis_address} ->
[recipient, genesis_address]

{:error, :acceptance_failed} ->
[recipient, recipient]

{:error, reason} ->
raise SelfRepair.Error,
function: "consolidate_recipients",
message: "Failed to fetch genesis address with error #{inspect(reason)}",
address: recipient
end
end,
|> Task.async_stream(&do_consolidate_recipient(&1, resolved_addresses, download_nodes),
max_concurrency: length(recipients)
)
|> Stream.flat_map(fn {:ok, addresses} -> addresses end)
Expand All @@ -547,6 +531,32 @@ defmodule Archethic.SelfRepair.Sync do

defp consolidate_recipients(attestation, _tx, _), do: attestation

defp do_consolidate_recipient(recipient, resolved_addresses, download_nodes) do
case Map.fetch(resolved_addresses, recipient) do
{:ok, genesis} ->
[recipient, genesis]

_ ->
genesis_nodes = Election.chain_storage_nodes(recipient, download_nodes)

case TransactionChain.fetch_genesis_address(recipient, genesis_nodes,
acceptance_resolver: :accept_different_genesis
) do
{:ok, genesis_address} ->
[recipient, genesis_address]

{:error, :acceptance_failed} ->
[recipient, recipient]

{:error, reason} ->
raise SelfRepair.Error,
function: "consolidate_recipients",
message: "Failed to fetch genesis address with error #{inspect(reason)}",
address: recipient
end
end
end

defp sync_node(end_of_node_synchronizations) do
end_of_node_synchronizations
|> Enum.each(fn public_key -> P2P.set_node_globally_synced(public_key) end)
Expand Down

0 comments on commit 1ad610b

Please sign in to comment.