Skip to content

Commit

Permalink
mostly refactor to use streams in lower lvl functions
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Apr 24, 2024
1 parent e05fb3e commit aa5f57a
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 105 deletions.
29 changes: 15 additions & 14 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.GetCurrentSummaries
alias Archethic.P2P.Message.GetCurrentReplicationsAttestations
alias Archethic.P2P.Message.GetCurrentReplicationsAttestationsResponse
alias Archethic.P2P.Message.GetCurrentReplicationAttestations
alias Archethic.P2P.Message.GetCurrentReplicationAttestationsResponse
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionSummaryList
Expand Down Expand Up @@ -213,14 +213,16 @@ defmodule Archethic.BeaconChain do
Returns the current summary's replication attestations that current node have
"""
@spec get_current_summary_replication_attestations(subset :: binary()) ::
list(ReplicationAttestation.t())
Enumerable.t() | list(ReplicationAttestation.t())
def get_current_summary_replication_attestations(subset) do
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)

SummaryCache.stream_current_slots(subset)
|> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
end)
|> Stream.concat(replication_attestations)
|> ReplicationAttestation.reduce_confirmations()
|> Enum.to_list()
end

@doc """
Expand Down Expand Up @@ -377,20 +379,19 @@ defmodule Archethic.BeaconChain do
Unordered
"""
@spec fetch_current_summary_replication_attestations(datetime :: DateTime.t()) ::
list(ReplicationAttestation.t())
Enumerable.t() | list(ReplicationAttestation.t())
def fetch_current_summary_replication_attestations(datetime = %DateTime{} \\ DateTime.utc_now()) do
get_next_summary_elected_subsets_by_nodes(datetime)
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summary_replications_attestations_from_node(node, subsets)
fetch_current_summary_replication_attestations_from_node(node, subsets)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, replications_attestations} -> replications_attestations end)
|> then(&ReplicationAttestation.reduce_confirmations/1)
|> Enum.to_list()
|> Stream.flat_map(fn {:ok, replication_attestations} -> replication_attestations end)
|> ReplicationAttestation.reduce_confirmations()
end

defp get_next_summary_elected_subsets_by_nodes(datetime) do
Expand Down Expand Up @@ -440,13 +441,13 @@ defmodule Archethic.BeaconChain do
|> Enum.to_list()
end

defp fetch_current_summary_replications_attestations_from_node(node, subsets) do
case P2P.send_message(node, %GetCurrentReplicationsAttestations{subsets: subsets}) do
defp fetch_current_summary_replication_attestations_from_node(node, subsets) do
case P2P.send_message(node, %GetCurrentReplicationAttestations{subsets: subsets}) do
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: replications_attestations
%GetCurrentReplicationAttestationsResponse{
replication_attestations: replication_attestations
}} ->
replications_attestations
replication_attestations

_ ->
[]
Expand Down
8 changes: 4 additions & 4 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ defmodule Archethic.P2P.Message do
GetBeaconSummariesAggregate,
GetBootstrappingNodes,
GetCurrentSummaries,
GetCurrentReplicationsAttestations,
GetCurrentReplicationsAttestationsResponse,
GetCurrentReplicationAttestations,
GetCurrentReplicationAttestationsResponse,
GetLastTransaction,
GetLastTransactionAddress,
GetNextAddresses,
Expand Down Expand Up @@ -120,7 +120,7 @@ defmodule Archethic.P2P.Message do
| GetGenesisAddress.t()
| ValidationError.t()
| GetCurrentSummaries.t()
| GetCurrentReplicationsAttestations.t()
| GetCurrentReplicationAttestations.t()
| GetBeaconSummariesAggregate.t()
| NotifyPreviousChain.t()
| ShardRepair.t()
Expand Down Expand Up @@ -161,7 +161,7 @@ defmodule Archethic.P2P.Message do
| NetworkStats.t()
| SmartContractCallValidation.t()
| DashboardData.t()
| GetCurrentReplicationsAttestationsResponse.t()
| GetCurrentReplicationAttestationsResponse.t()

@floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed])
@content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Archethic.P2P.Message.GetCurrentReplicationsAttestations do
defmodule Archethic.P2P.Message.GetCurrentReplicationAttestations do
@moduledoc """
A request to get the replication attestations of current summary
The nodes receiving this must be elected to store the given subsets
Expand All @@ -10,30 +10,19 @@ defmodule Archethic.P2P.Message.GetCurrentReplicationsAttestations do
alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.Crypto
alias Archethic.BeaconChain
alias Archethic.BeaconChain.Subset
alias Archethic.BeaconChain.Slot
alias Archethic.P2P.Message.GetCurrentReplicationsAttestationsResponse
alias Archethic.P2P.Message.GetCurrentReplicationAttestationsResponse

@type t :: %__MODULE__{subsets: list(binary())}

@spec process(message :: __MODULE__.t(), sender_public_key :: Crypto.key()) ::
GetCurrentReplicationsAttestationsResponse.t()
GetCurrentReplicationAttestationsResponse.t()
def process(%__MODULE__{subsets: subsets}, _) do
replications_attestations =
Enum.flat_map(subsets, fn subset ->
%Slot{transaction_attestations: replications_attestations} =
Subset.get_current_slot(subset)

[
replications_attestations,
BeaconChain.get_current_summary_replication_attestations(subset)
]
end)
|> ReplicationAttestation.reduce_confirmations()
|> Enum.to_list()

%GetCurrentReplicationsAttestationsResponse{
replications_attestations: replications_attestations
%GetCurrentReplicationAttestationsResponse{
replication_attestations:
subsets
|> Stream.flat_map(&BeaconChain.get_current_summary_replication_attestations/1)
|> ReplicationAttestation.reduce_confirmations()
|> Enum.to_list()
}
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
defmodule Archethic.P2P.Message.GetCurrentReplicationsAttestationsResponse do
defmodule Archethic.P2P.Message.GetCurrentReplicationAttestationsResponse do
@moduledoc """
The response message of GetCurrentReplicationsAttestations
The response message of GetCurrentReplicationAttestations
"""

@enforce_keys [:replications_attestations]
defstruct [:replications_attestations]
@enforce_keys [:replication_attestations]
defstruct [:replication_attestations]

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.Utils.VarInt

@type t() :: %__MODULE__{
replications_attestations: list(ReplicationAttestation.t())
replication_attestations: list(ReplicationAttestation.t())
}

@spec serialize(message :: t()) :: bitstring()
def serialize(%__MODULE__{replications_attestations: replications_attestations}) do
replications_attestations_bin =
replications_attestations
def serialize(%__MODULE__{replication_attestations: replication_attestations}) do
replication_attestations_bin =
replication_attestations
|> Enum.map(&ReplicationAttestation.serialize/1)
|> :erlang.list_to_bitstring()

encoded_replications_attestations_len =
length(replications_attestations) |> VarInt.from_value()
encoded_replication_attestations_len = length(replication_attestations) |> VarInt.from_value()

<<encoded_replications_attestations_len::binary, replications_attestations_bin::bitstring>>
<<encoded_replication_attestations_len::binary, replication_attestations_bin::bitstring>>
end

@spec deserialize(bin :: bitstring()) :: {t(), bitstring()}
def deserialize(<<rest::bitstring>>) do
{count, rest} = rest |> VarInt.get_value()

{replications_attestations, <<rest::bitstring>>} = deserialize_list(rest, count, [])
{replication_attestations, <<rest::bitstring>>} = deserialize_list(rest, count, [])

{
%__MODULE__{
replications_attestations: replications_attestations
replication_attestations: replication_attestations
},
rest
}
Expand Down
8 changes: 4 additions & 4 deletions lib/archethic/p2p/message/message_id.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ defmodule Archethic.P2P.MessageId do
RegisterBeaconUpdates,
GetGenesisAddress,
GetCurrentSummaries,
GetCurrentReplicationsAttestations,
GetCurrentReplicationsAttestationsResponse,
GetCurrentReplicationAttestations,
GetCurrentReplicationAttestationsResponse,
GetBeaconSummariesAggregate,
NotifyPreviousChain,
GetNextAddresses,
Expand Down Expand Up @@ -127,7 +127,7 @@ defmodule Archethic.P2P.MessageId do
GetNetworkStats => 39,
GetDashboardData => 40,
UnlockChain => 41,
GetCurrentReplicationsAttestations => 42,
GetCurrentReplicationAttestations => 42,

# Responses
DashboardData => 225,
Expand All @@ -148,7 +148,7 @@ defmodule Archethic.P2P.MessageId do
Summary => 240,
LastTransactionAddress => 241,
FirstPublicKey => 242,
GetCurrentReplicationsAttestationsResponse => 243,
GetCurrentReplicationAttestationsResponse => 243,
TransactionInputList => 244,
TransactionChainLength => 245,
BootstrappingNodes => 246,
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ defmodule Archethic.SelfRepair do
@spec synchronize_current_summary() :: integer()
def synchronize_current_summary() do
BeaconChain.fetch_current_summary_replication_attestations()
|> Sync.process_replications_attestations(P2P.authorized_and_available_nodes())
|> Enum.to_list()
|> Sync.process_replication_attestations(P2P.authorized_and_available_nodes())
end
end
10 changes: 5 additions & 5 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ defmodule Archethic.SelfRepair.Sync do
) do
start_time = System.monotonic_time()

nb_transactions = process_replications_attestations(attestations, download_nodes)
nb_transactions = process_replication_attestations(attestations, download_nodes)

:telemetry.execute(
[:archethic, :self_repair, :process_aggregate],
Expand Down Expand Up @@ -348,14 +348,14 @@ defmodule Archethic.SelfRepair.Sync do
@doc """
Downloads and stores the transactions missed, returns the count of transactions synchronized
"""
@spec process_replications_attestations(
replications_attestations :: list(ReplicationAttestation.t()),
@spec process_replication_attestations(
replication_attestations :: list(ReplicationAttestation.t()),
download_nodes :: list(Node.t())
) :: integer()
def process_replications_attestations(replications_attestations, download_nodes) do
def process_replication_attestations(replication_attestations, download_nodes) do
nodes_including_self = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes()

replications_attestations
replication_attestations
|> adjust_attestations(download_nodes)
|> Stream.filter(&TransactionHandler.download_transaction?(&1, nodes_including_self))
|> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime})
Expand Down
47 changes: 26 additions & 21 deletions test/archethic/beacon_chain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule Archethic.BeaconChainTest do

alias Archethic.P2P
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.GetCurrentReplicationsAttestations
alias Archethic.P2P.Message.GetCurrentReplicationsAttestationsResponse
alias Archethic.P2P.Message.GetCurrentReplicationAttestations
alias Archethic.P2P.Message.GetCurrentReplicationAttestationsResponse
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.GetCurrentSummaries
Expand Down Expand Up @@ -666,7 +666,9 @@ defmodule Archethic.BeaconChainTest do

describe "fetch_current_summary_replication_attestations/0" do
test "should return empty if there is nothing yet" do
assert [] = BeaconChain.fetch_current_summary_replication_attestations()
assert [] =
BeaconChain.fetch_current_summary_replication_attestations()
|> Enum.to_list()
end

test "should return the attestations" do
Expand Down Expand Up @@ -699,18 +701,19 @@ defmodule Archethic.BeaconChainTest do

for node <- nodes, do: P2P.add_and_connect_node(node)

replications_attestations = [random_replication_attestation(now)]
replication_attestations = [random_replication_attestation(now)]

MockClient
|> expect(:send_message, length(nodes), fn _, %GetCurrentReplicationsAttestations{}, _ ->
|> expect(:send_message, length(nodes), fn _, %GetCurrentReplicationAttestations{}, _ ->
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: replications_attestations
%GetCurrentReplicationAttestationsResponse{
replication_attestations: replication_attestations
}}
end)

assert ^replications_attestations =
assert ^replication_attestations =
BeaconChain.fetch_current_summary_replication_attestations()
|> Enum.to_list()
end

test "should merge attestations when different" do
Expand Down Expand Up @@ -746,30 +749,32 @@ defmodule Archethic.BeaconChainTest do
replication_attestation1 = random_replication_attestation(now)
replication_attestation2 = random_replication_attestation(now)
replication_attestation3 = random_replication_attestation(now)
node1_replications_attestations = [replication_attestation1, replication_attestation2]
node2_replications_attestations = [replication_attestation1, replication_attestation3]
node1_replication_attestations = [replication_attestation1, replication_attestation2]
node2_replication_attestations = [replication_attestation1, replication_attestation3]

MockClient
|> expect(:send_message, 2, fn
^node1, %GetCurrentReplicationsAttestations{}, _ ->
^node1, %GetCurrentReplicationAttestations{}, _ ->
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: node1_replications_attestations
%GetCurrentReplicationAttestationsResponse{
replication_attestations: node1_replication_attestations
}}

^node2, %GetCurrentReplicationsAttestations{}, _ ->
^node2, %GetCurrentReplicationAttestations{}, _ ->
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: node2_replications_attestations
%GetCurrentReplicationAttestationsResponse{
replication_attestations: node2_replication_attestations
}}
end)

replications_attestations = BeaconChain.fetch_current_summary_replication_attestations()
replication_attestations =
BeaconChain.fetch_current_summary_replication_attestations()
|> Enum.to_list()

assert 3 == length(replications_attestations)
assert Enum.any?(replications_attestations, &(&1 == replication_attestation1))
assert Enum.any?(replications_attestations, &(&1 == replication_attestation2))
assert Enum.any?(replications_attestations, &(&1 == replication_attestation3))
assert 3 == length(replication_attestations)
assert Enum.any?(replication_attestations, &(&1 == replication_attestation1))
assert Enum.any?(replication_attestations, &(&1 == replication_attestation2))
assert Enum.any?(replication_attestations, &(&1 == replication_attestation3))
end
end

Expand Down
10 changes: 5 additions & 5 deletions test/archethic/bootstrap_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ defmodule Archethic.BootstrapTest do
TransactionChainLength,
TransactionInputList,
TransactionList,
GetCurrentReplicationsAttestations,
GetCurrentReplicationsAttestationsResponse
GetCurrentReplicationAttestations,
GetCurrentReplicationAttestationsResponse
}

alias Archethic.Replication
Expand Down Expand Up @@ -129,10 +129,10 @@ defmodule Archethic.BootstrapTest do
_, %GetGenesisAddress{}, _ ->
{:ok, %NotFound{}}

_, %GetCurrentReplicationsAttestations{}, _ ->
_, %GetCurrentReplicationAttestations{}, _ ->
{:ok,
%GetCurrentReplicationsAttestationsResponse{
replications_attestations: []
%GetCurrentReplicationAttestationsResponse{
replication_attestations: []
}}
end)

Expand Down
Loading

0 comments on commit aa5f57a

Please sign in to comment.