From 1ad610b8ddd7bca3780522f64050f4f0ef4064f7 Mon Sep 17 00:00:00 2001 From: Neylix Date: Fri, 10 Jan 2025 12:36:09 +0100 Subject: [PATCH] Optimize self repair recipient consolidation --- lib/archethic/self_repair/sync.ex | 50 ++++++++++++++++++------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 7110f16f9..3cb8bec68 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -510,28 +510,12 @@ defmodule Archethic.SelfRepair.Sync do %Transaction{validation_stamp: %ValidationStamp{recipients: recipients = [_ | _]}}, download_nodes ) do + resolved_addresses = + Enum.chunk_every(movements_addresses, 2) |> Enum.map(&List.to_tuple/1) |> Map.new() + consolidated_movements_addresses = recipients - |> Task.async_stream( - fn recipient -> - genesis_nodes = Election.chain_storage_nodes(recipient, download_nodes) - - case TransactionChain.fetch_genesis_address(recipient, genesis_nodes, - acceptance_resolver: :accept_different_genesis - ) do - {:ok, genesis_address} -> - [recipient, genesis_address] - - {:error, :acceptance_failed} -> - [recipient, recipient] - - {:error, reason} -> - raise SelfRepair.Error, - function: "consolidate_recipients", - message: "Failed to fetch genesis address with error #{inspect(reason)}", - address: recipient - end - end, + |> Task.async_stream(&do_consolidate_recipient(&1, resolved_addresses, download_nodes), max_concurrency: length(recipients) ) |> Stream.flat_map(fn {:ok, addresses} -> addresses end) @@ -547,6 +531,32 @@ defmodule Archethic.SelfRepair.Sync do defp consolidate_recipients(attestation, _tx, _), do: attestation + defp do_consolidate_recipient(recipient, resolved_addresses, download_nodes) do + case Map.fetch(resolved_addresses, recipient) do + {:ok, genesis} -> + [recipient, genesis] + + _ -> + genesis_nodes = Election.chain_storage_nodes(recipient, download_nodes) + + case TransactionChain.fetch_genesis_address(recipient, genesis_nodes, + acceptance_resolver: :accept_different_genesis + ) do + {:ok, genesis_address} -> + [recipient, genesis_address] + + {:error, :acceptance_failed} -> + [recipient, recipient] + + {:error, reason} -> + raise SelfRepair.Error, + function: "consolidate_recipients", + message: "Failed to fetch genesis address with error #{inspect(reason)}", + address: recipient + end + end + end + defp sync_node(end_of_node_synchronizations) do end_of_node_synchronizations |> Enum.each(fn public_key -> P2P.set_node_globally_synced(public_key) end)