From 91a056021acf411ac73f02990dbdd0f6846b69a9 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Fri, 25 Oct 2024 14:37:22 +0200 Subject: [PATCH 1/8] rename function --- lib/archethic.ex | 2 +- lib/archethic/beacon_chain.ex | 2 +- .../mining/pending_transaction_validation.ex | 2 +- lib/archethic/networking/scheduler.ex | 2 +- lib/archethic/p2p.ex | 17 +++++++++-------- .../p2p/message/get_bootstraping_nodes.ex | 2 +- .../api/graphql/schema/resolver.ex | 2 +- .../prod/1.5.9@add_mining_bls_key.exs | 2 +- 8 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 50a39c45be..b1d7fa9f1d 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -157,7 +157,7 @@ defmodule Archethic do P2P.authorized_and_available_nodes() |> Enum.reject(&(&1.first_public_key == welcome_node_key)) |> Enum.sort_by(& &1.first_public_key) - |> P2P.nearest_nodes(welcome_node_patch) + |> P2P.sort_by_nearest_nodes(welcome_node_patch) |> Enum.filter(&P2P.node_connected?/1) this_node = Crypto.first_node_public_key() diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index b0f6d6a62d..908e265d5c 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -404,7 +404,7 @@ defmodule Archethic.BeaconChain do subset |> Election.beacon_storage_nodes(next_summary_date, authorized_nodes) |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() |> Enum.take(3) |> Enum.map(&{&1, subset}) end) diff --git a/lib/archethic/mining/pending_transaction_validation.ex b/lib/archethic/mining/pending_transaction_validation.ex index 2ddb2dc25e..3357e56ec4 100644 --- a/lib/archethic/mining/pending_transaction_validation.ex +++ b/lib/archethic/mining/pending_transaction_validation.ex @@ -978,7 +978,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do previous_address |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() |> Enum.filter(&P2P.node_connected?/1) |> get_first_public_key(previous_address) end diff --git a/lib/archethic/networking/scheduler.ex b/lib/archethic/networking/scheduler.ex index 43cc7b68c0..eafc8c182e 100644 --- a/lib/archethic/networking/scheduler.ex +++ b/lib/archethic/networking/scheduler.ex @@ -153,7 +153,7 @@ defmodule Archethic.Networking.Scheduler do nodes = P2P.authorized_and_available_nodes() |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() case Utils.await_confirmation(tx_address, nodes) do {:ok, validated_transaction = %Transaction{address: ^tx_address, data: ^transaction_data}} -> diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index d92e46064b..bcaf5d27c0 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -445,11 +445,11 @@ defmodule Archethic.P2P do @doc """ Return the nearest storages nodes from the local node """ - @spec nearest_nodes(list(Node.t())) :: list(Node.t()) - def nearest_nodes(storage_nodes) when is_list(storage_nodes) do + @spec sort_by_nearest_nodes(list(Node.t())) :: list(Node.t()) + def sort_by_nearest_nodes(storage_nodes) when is_list(storage_nodes) do case get_node_info(Crypto.first_node_public_key()) do {:ok, %Node{network_patch: network_patch}} -> - nearest_nodes(storage_nodes, network_patch) + sort_by_nearest_nodes(storage_nodes, network_patch) {:error, :not_found} -> storage_nodes @@ -467,7 +467,7 @@ defmodule Archethic.P2P do ...> %Node{network_patch: "3A2"} ...> ] ...> - ...> P2P.nearest_nodes(list_nodes, "12F") + ...> P2P.sort_by_nearest_nodes(list_nodes, "12F") [ %Node{network_patch: "3A2"}, %Node{network_patch: "AA0"}, @@ -480,15 +480,16 @@ defmodule Archethic.P2P do ...> %Node{network_patch: "3A2"} ...> ] ...> - ...> P2P.nearest_nodes(list_nodes, "C3A") + ...> P2P.sort_by_nearest_nodes(list_nodes, "C3A") [ %Node{network_patch: "F50"}, %Node{network_patch: "AA0"}, %Node{network_patch: "3A2"} ] """ - @spec nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: Enumerable.t() - def nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do + @spec sort_by_nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: + Enumerable.t() + def sort_by_nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do Enum.sort_by(storage_nodes, &network_distance(&1.network_patch, network_patch)) end @@ -761,7 +762,7 @@ defmodule Archethic.P2P do ) do nodes |> Enum.filter(&node_connected?/1) - |> nearest_nodes() + |> sort_by_nearest_nodes() |> unprioritize_node(Crypto.first_node_public_key()) |> do_quorum_read( message, diff --git a/lib/archethic/p2p/message/get_bootstraping_nodes.ex b/lib/archethic/p2p/message/get_bootstraping_nodes.ex index 4449226679..f3bc8296b1 100644 --- a/lib/archethic/p2p/message/get_bootstraping_nodes.ex +++ b/lib/archethic/p2p/message/get_bootstraping_nodes.ex @@ -23,7 +23,7 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do closest_nodes = top_nodes - |> P2P.nearest_nodes(patch) + |> P2P.sort_by_nearest_nodes(patch) |> Enum.take(5) %BootstrappingNodes{ diff --git a/lib/archethic_web/api/graphql/schema/resolver.ex b/lib/archethic_web/api/graphql/schema/resolver.ex index 9bfb31d77f..43af41c49f 100644 --- a/lib/archethic_web/api/graphql/schema/resolver.ex +++ b/lib/archethic_web/api/graphql/schema/resolver.ex @@ -341,7 +341,7 @@ defmodule ArchethicWeb.API.GraphQL.Schema.Resolver do def nearest_endpoints(ip) do P2P.authorized_and_available_nodes() - |> P2P.nearest_nodes(P2P.get_geo_patch(ip)) + |> P2P.sort_by_nearest_nodes(P2P.get_geo_patch(ip)) |> Enum.map(&%{ip: :inet.ntoa(&1.ip), port: &1.http_port}) end diff --git a/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs b/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs index dbcf025434..f4ae9bc6fd 100644 --- a/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs +++ b/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs @@ -44,7 +44,7 @@ defmodule Migration_1_5_9 do nodes = P2P.authorized_and_available_nodes() |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() case Utils.await_confirmation(tx.address, nodes) do {:ok, _} -> From d36617d6ba3dddef9aa8e80093ec46626483ee54 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Fri, 25 Oct 2024 15:40:48 +0200 Subject: [PATCH 2/8] refactor the quorum using a reduce --- lib/archethic/p2p.ex | 140 +++++++++++-------------------------------- 1 file changed, 35 insertions(+), 105 deletions(-) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index bcaf5d27c0..f13e337411 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -764,122 +764,52 @@ defmodule Archethic.P2P do |> Enum.filter(&node_connected?/1) |> sort_by_nearest_nodes() |> unprioritize_node(Crypto.first_node_public_key()) - |> do_quorum_read( - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - 0, - nil - ) - end - - defp do_quorum_read([], _, _, _, _, _, 0, nil), do: {:error, :network_issue} - defp do_quorum_read([], _, _, _, _, _, _, nil), do: {:error, :acceptance_failed} - defp do_quorum_read([], _, _, _, _, _, _, previous_result), do: {:ok, previous_result} - - defp do_quorum_read( - nodes, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - previous_result - ) do - # We determine how many nodes to fetch for the quorum from the consistency level - {group, rest} = Enum.split(nodes, consistency_level) - - timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout - - results = - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - group, - &send_message(&1, message, timeout), - ordered: false, - on_timeout: :kill_task, - timeout: timeout + 2000 - ) - |> Stream.filter(&match?({:ok, {:ok, _}}, &1)) - |> Stream.map(fn {:ok, {:ok, res}} -> res end) - |> Enum.to_list() - - # If no nodes answered we try another group - case length(results) do - 0 -> - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - previous_result + |> Enum.chunk_every(consistency_level) + |> Enum.reduce_while(nil, fn nodes, previous_result -> + results = + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + nodes, + &send_message(&1, message, timeout), + ordered: false, + on_timeout: :kill_task, + timeout: timeout + 2000 ) - - 1 -> - quorum_result = - if previous_result do - do_quorum([previous_result | results], conflict_resolver) + |> Stream.filter(&match?({:ok, {:ok, _}}, &1)) + |> Stream.map(fn {:ok, {:ok, res}} -> res end) + |> Enum.to_list() + + if Enum.empty?(results) do + {:cont, previous_result} + else + result = + if previous_result == nil do + reduce_results(results, conflict_resolver) else - List.first(results) + reduce_results([previous_result | results], conflict_resolver) end - with true <- acceptance_resolver.(quorum_result), - nil <- previous_result do - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - quorum_result - ) + if acceptance_resolver.(result) do + {:halt, result} else - false -> - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count + 1, - previous_result - ) - - _ -> - {:ok, quorum_result} + {:cont, result} end + end + end) + |> then(fn + nil -> + {:error, :network_issue} - _ -> - results = if previous_result != nil, do: [previous_result | results], else: results - quorum_result = do_quorum(results, conflict_resolver) - - if acceptance_resolver.(quorum_result) do - {:ok, quorum_result} + result -> + if acceptance_resolver.(result) do + {:ok, result} else - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count + 1, - previous_result - ) + {:error, :acceptance_failed} end - end + end) end - defp do_quorum(results, conflict_resolver) do + defp reduce_results(results, conflict_resolver) do distinct_elems = Enum.dedup(results) # If the results are the same, then we reached consistency From 94c16354d6d43a97215f1170187781086db5520a Mon Sep 17 00:00:00 2001 From: bchamagne Date: Fri, 25 Oct 2024 16:24:38 +0200 Subject: [PATCH 3/8] return the results zipped with the node + refactor --- lib/archethic/p2p.ex | 120 ++++++++++++++++---------- playground.diff | 168 ++++++++++++++++++++++++++++++++++++ test/archethic/p2p_test.exs | 29 +++++++ 3 files changed, 271 insertions(+), 46 deletions(-) create mode 100644 playground.diff diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index f13e337411..78d2f78d7c 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -740,7 +740,8 @@ defmodule Archethic.P2P do conflict_resolver :: (list(Message.t()) -> Message.t()), timeout :: non_neg_integer(), acceptance_resolver :: (Message.t() -> boolean()), - consistency_level :: pos_integer() + consistency_level :: pos_integer(), + repair_fun :: (list(Message.t()) -> :ok) ) :: {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed} def quorum_read( @@ -749,7 +750,8 @@ defmodule Archethic.P2P do conflict_resolver \\ fn results -> List.first(results) end, timeout \\ 0, acceptance_resolver \\ fn _ -> true end, - consistency_level \\ 3 + consistency_level \\ 3, + repair_fun \\ fn _ -> :ok end ) def quorum_read( @@ -758,67 +760,93 @@ defmodule Archethic.P2P do conflict_resolver, timeout, acceptance_resolver, - consistency_level + consistency_level, + repair_fun ) do nodes |> Enum.filter(&node_connected?/1) |> sort_by_nearest_nodes() |> unprioritize_node(Crypto.first_node_public_key()) |> Enum.chunk_every(consistency_level) - |> Enum.reduce_while(nil, fn nodes, previous_result -> - results = - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - nodes, - &send_message(&1, message, timeout), - ordered: false, - on_timeout: :kill_task, - timeout: timeout + 2000 - ) - |> Stream.filter(&match?({:ok, {:ok, _}}, &1)) - |> Stream.map(fn {:ok, {:ok, res}} -> res end) - |> Enum.to_list() - - if Enum.empty?(results) do - {:cont, previous_result} + |> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} -> + results_by_node = send_message_and_filter_results(nodes, message, timeout) + + with :ok <- enough_results?(previous_result_acc, results_by_node), + result <- resolve_conflicts(previous_result_acc, results_by_node, conflict_resolver), + :ok <- result_accepted?(result, acceptance_resolver) do + {:halt, {result, all_results_acc ++ results_by_node}} else - result = - if previous_result == nil do - reduce_results(results, conflict_resolver) - else - reduce_results([previous_result | results], conflict_resolver) - end - - if acceptance_resolver.(result) do - {:halt, result} - else - {:cont, result} - end + {:error, previous_result} -> + {:cont, {previous_result, all_results_acc ++ results_by_node}} end end) - |> then(fn - nil -> - {:error, :network_issue} + |> then(fn {result, all_results} -> + Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(all_results) end) + + cond do + is_nil(result) -> + {:error, :network_issue} - result -> - if acceptance_resolver.(result) do + acceptance_resolver.(result) -> {:ok, result} - else + + true -> {:error, :acceptance_failed} - end + end end) end - defp reduce_results(results, conflict_resolver) do - distinct_elems = Enum.dedup(results) + defp send_message_and_filter_results(nodes, message, timeout) do + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + nodes, + &{&1.first_public_key, send_message(&1, message, timeout)}, + ordered: false, + on_timeout: :kill_task, + timeout: timeout + 2000 + ) + |> Stream.filter(&match?({:ok, {_, {:ok, _}}}, &1)) + |> Stream.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end) + |> Enum.to_list() + end + + # returns ok if we have 2 results + # we should probably use the consistency level + defp enough_results?(nil, results_by_node) do + case results_by_node do + [] -> {:error, nil} + [{_node, result}] -> {:error, result} + _ -> :ok + end + end + + defp enough_results?(previous_result, results_by_node) do + case results_by_node do + [] -> {:error, previous_result} + _ -> :ok + end + end - # If the results are the same, then we reached consistency - if length(distinct_elems) == 1 do - List.first(distinct_elems) + defp resolve_conflicts(nil, results_by_node, conflict_resolver) do + results_by_node + |> Enum.map(&elem(&1, 1)) + |> Enum.dedup() + |> conflict_resolver.() + end + + defp resolve_conflicts(previous_result, results_by_node, conflict_resolver) do + results_by_node + |> Enum.map(&elem(&1, 1)) + |> Kernel.++([previous_result]) + |> Enum.dedup() + |> conflict_resolver.() + end + + defp result_accepted?(result, acceptance_resolver) do + if acceptance_resolver.(result) do + :ok else - # If the results differ, we can apply a conflict resolver to merge the result into - # a consistent response - conflict_resolver.(distinct_elems) + {:error, result} end end diff --git a/playground.diff b/playground.diff new file mode 100644 index 0000000000..62684918a9 --- /dev/null +++ b/playground.diff @@ -0,0 +1,168 @@ +commit 7de9ac53236b308d3cd964df9914f78b580add97 +Author: bchamagne +Date: Wed May 29 11:33:41 2024 +0200 + + Fix the execution's errors when cache? is false + +diff --git a/lib/archethic/contracts.ex b/lib/archethic/contracts.ex +index da72f355..33f4e1ca 100644 +--- a/lib/archethic/contracts.ex ++++ b/lib/archethic/contracts.ex +@@ -560,26 +560,31 @@ defmodule Archethic.Contracts do + end + + defp cache_interpreter_execute(fun, key, opts) do +- if Keyword.get(opts, :cache?) do +- timeout = Keyword.get(opts, :timeout, 5_000) +- +- func = fn -> +- try do +- fun.() +- rescue +- err -> +- # error from the code (ex: 1 + "abc") +- {:error, err, __STACKTRACE__} +- end ++ func = fn -> ++ try do ++ fun.() ++ rescue ++ err -> ++ # error or throw from the user's code (ex: 1 + "abc") ++ {:error, err, __STACKTRACE__} + end ++ end + +- # We set the maximum timeout for a transaction to be processed before the kill the cache +- case Utils.JobCache.get!(key, function: func, timeout: timeout, ttl: 60_000) do +- {:error, err, stacktrace} -> {:error, raise_to_failure(err, stacktrace)} +- result -> result ++ result = ++ if Keyword.get(opts, :cache?) do ++ # We set the maximum timeout for a transaction to be processed before the kill the cache ++ Utils.JobCache.get!(key, ++ function: func, ++ timeout: Keyword.get(opts, :timeout, 5_000), ++ ttl: 60_000 ++ ) ++ else ++ func.() + end +- else +- fun.() ++ ++ case result do ++ {:error, err, stacktrace} -> {:error, raise_to_failure(err, stacktrace)} ++ result -> result + end + rescue + _ -> +diff --git a/test/archethic/contracts_test.exs b/test/archethic/contracts_test.exs +index 50414cf7..6c128a55 100644 +--- a/test/archethic/contracts_test.exs ++++ b/test/archethic/contracts_test.exs +@@ -293,7 +293,7 @@ defmodule Archethic.ContractsTest do + + trigger_tx = TransactionFactory.create_valid_transaction() + +- assert {:error, %Failure{}} = ++ assert {:error, %Failure{user_friendly_error: "bad argument in arithmetic expression - L3"}} = + Contracts.execute_condition( + {:transaction, nil, nil}, + Contract.from_transaction!(contract_tx), +@@ -302,6 +302,55 @@ defmodule Archethic.ContractsTest do + DateTime.utc_now(), + [] + ) ++ ++ assert {:error, %Failure{user_friendly_error: "bad argument in arithmetic expression - L3"}} = ++ Contracts.execute_condition( ++ {:transaction, nil, nil}, ++ Contract.from_transaction!(contract_tx), ++ trigger_tx, ++ nil, ++ DateTime.utc_now(), ++ [], ++ cache?: false ++ ) ++ end ++ ++ test "should return Error if condition throws" do ++ code = """ ++ @version 1 ++ condition triggered_by: transaction do ++ throw code: 12, message: "oh no" ++ end ++ ++ actions triggered_by: transaction do ++ Contract.set_content "hello" ++ end ++ """ ++ ++ contract_tx = ContractFactory.create_valid_contract_tx(code) ++ ++ trigger_tx = TransactionFactory.create_valid_transaction() ++ ++ assert {:error, %Failure{user_friendly_error: "oh no - L3"}} = ++ Contracts.execute_condition( ++ {:transaction, nil, nil}, ++ Contract.from_transaction!(contract_tx), ++ trigger_tx, ++ nil, ++ DateTime.utc_now(), ++ [] ++ ) ++ ++ assert {:error, %Failure{user_friendly_error: "oh no - L3"}} = ++ Contracts.execute_condition( ++ {:transaction, nil, nil}, ++ Contract.from_transaction!(contract_tx), ++ trigger_tx, ++ nil, ++ DateTime.utc_now(), ++ [], ++ cache?: false ++ ) + end + + test "should be able to use a custom function call as parameter in condition block" do +@@ -603,6 +652,39 @@ defmodule Archethic.ContractsTest do + ) + end + ++ test "should return the proper error when there is a throw" do ++ code = """ ++ @version 1 ++ condition triggered_by: transaction, as: [] ++ actions triggered_by: transaction do ++ throw code: 1, message: "nope" ++ end ++ """ ++ ++ contract_tx = ContractFactory.create_valid_contract_tx(code) ++ ++ incoming_tx = TransactionFactory.create_valid_transaction() ++ ++ assert {:error, %Failure{user_friendly_error: "nope - L4"}} = ++ Contracts.execute_trigger( ++ {:transaction, nil, nil}, ++ Contract.from_transaction!(contract_tx), ++ incoming_tx, ++ nil, ++ [] ++ ) ++ ++ assert {:error, %Failure{user_friendly_error: "nope - L4"}} = ++ Contracts.execute_trigger( ++ {:transaction, nil, nil}, ++ Contract.from_transaction!(contract_tx), ++ incoming_tx, ++ nil, ++ [], ++ cache?: false ++ ) ++ end ++ + test "should fail if the state is too big" do + code = ~S""" + @version 1 diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 5e4b7fa310..05504aaed4 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -161,6 +161,35 @@ defmodule Archethic.P2PTest do fn _ -> false end ) end + + test "should call the repair function for every results", %{nodes: nodes} do + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:error, :acceptance_failed} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + fn results -> List.first(results) end, + 0, + fn _ -> false end, + 3, + fn all_results -> + assert match?([{_, _} | _], all_results) + Process.send(me, {:repairing, length(all_results)}, []) + Process.sleep(10_000) + :ok + end + ) + + expected_size = length(nodes) + assert_receive({:repairing, ^expected_size}, 100) + end end describe "authorized_and_available_nodes/1" do From 9f985365630306126d1bbca93dc11b18f5c346ca Mon Sep 17 00:00:00 2001 From: bchamagne Date: Mon, 28 Oct 2024 09:16:53 +0100 Subject: [PATCH 4/8] quorum_read use a keyword list --- lib/archethic/beacon_chain.ex | 2 +- .../mining/smart_contract_validation.ex | 6 +-- lib/archethic/p2p.ex | 40 ++++++++----------- lib/archethic/transaction_chain.ex | 38 +++++++++--------- test/archethic/p2p_test.exs | 31 +++++++------- 5 files changed, 55 insertions(+), 62 deletions(-) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 908e265d5c..b832dce31d 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -516,7 +516,7 @@ defmodule Archethic.BeaconChain do case P2P.quorum_read( nodes, %GetBeaconSummariesAggregate{date: summary_time}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, aggregate = %SummaryAggregate{}} -> {:ok, aggregate} diff --git a/lib/archethic/mining/smart_contract_validation.ex b/lib/archethic/mining/smart_contract_validation.ex index c3392be1e2..d39e075ebe 100644 --- a/lib/archethic/mining/smart_contract_validation.ex +++ b/lib/archethic/mining/smart_contract_validation.ex @@ -83,7 +83,7 @@ defmodule Archethic.Mining.SmartContractValidation do |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) |> Election.get_synchronized_nodes_before(previous_summary_time) - conflicts_resolver = fn results -> + conflict_resolver = fn results -> %SmartContractCallValidation{last_chain_sync_date: highest_date} = Enum.max_by(results, & &1.last_chain_sync_date, DateTime) @@ -108,8 +108,8 @@ defmodule Archethic.Mining.SmartContractValidation do transaction: transaction, timestamp: validation_time }, - conflicts_resolver, - @timeout + conflict_resolver: conflict_resolver, + timeout: @timeout ) do {:ok, %SmartContractCallValidation{status: :ok, fee: fee}} -> {:ok, fee} diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 78d2f78d7c..050e8e5771 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -163,7 +163,7 @@ defmodule Archethic.P2P do case quorum_read( nodes, %ListNodes{authorized_and_available?: authorized_and_available?}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %NodeList{nodes: nodes}} -> {:ok, nodes} @@ -733,36 +733,30 @@ defmodule Archethic.P2P do @doc """ Send a message to a list of nodes and perform a read quorum + + Opts: + timeout :: non_neg_integer(), + conflict_resolver :: (list(Message.t()) -> Message.t()), + acceptance_resolver :: (Message.t() -> boolean()), + consistency_level :: pos_integer(), + repair_fun :: (list(Message.t()) -> :ok) """ @spec quorum_read( node_list :: list(Node.t()), message :: Message.t(), - conflict_resolver :: (list(Message.t()) -> Message.t()), - timeout :: non_neg_integer(), - acceptance_resolver :: (Message.t() -> boolean()), - consistency_level :: pos_integer(), - repair_fun :: (list(Message.t()) -> :ok) - ) :: - {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed} - def quorum_read( - nodes, - message, - conflict_resolver \\ fn results -> List.first(results) end, - timeout \\ 0, - acceptance_resolver \\ fn _ -> true end, - consistency_level \\ 3, - repair_fun \\ fn _ -> :ok end - ) - + opts :: Keyword.t() + ) :: {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed} def quorum_read( nodes, message, - conflict_resolver, - timeout, - acceptance_resolver, - consistency_level, - repair_fun + opts \\ [] ) do + timeout = Keyword.get(opts, :timeout, 0) + conflict_resolver = Keyword.get(opts, :conflict_resolver, &List.first(&1)) + acceptance_resolver = Keyword.get(opts, :acceptance_resolver, fn _ -> true end) + consistency_level = Keyword.get(opts, :consistency_level, 3) + repair_fun = Keyword.get(opts, :repair_fun, fn _ -> :ok end) + nodes |> Enum.filter(&node_connected?/1) |> sort_by_nearest_nodes() diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 2fbb472923..62b940c1c1 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -305,10 +305,10 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetLastTransactionAddress{address: address, timestamp: timestamp}, - conflict_resolver, - timeout, - acceptance_resolver, - consistency_level + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver, + consistency_level: consistency_level ) do {:ok, %LastTransactionAddress{address: last_address}} -> {:ok, last_address} @@ -343,7 +343,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetNextAddresses{address: address, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %AddressList{addresses: addresses}} -> {:ok, addresses} {:error, :network_issue} = e -> e @@ -422,9 +422,9 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransaction{address: address}, - conflict_resolver, - timeout, - acceptance_resolver + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver ) do {:ok, %NotFound{}} -> {:error, :transaction_not_exists} @@ -612,8 +612,8 @@ defmodule Archethic.TransactionChain do paging_state: paging_state, order: order }, - conflict_resolver, - timeout + conflict_resolver: conflict_resolver, + timeout: timeout ) do {:ok, %TransactionList{ @@ -696,7 +696,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransactionInputs{address: address, offset: offset, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %TransactionInputList{inputs: versioned_inputs, more?: more?, offset: offset}} -> {versioned_inputs, more?, offset} @@ -775,7 +775,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetUnspentOutputs{address: address, offset: offset, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %UnspentOutputList{ @@ -808,7 +808,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransactionChainLength{address: address}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %TransactionChainLength{length: length}} -> {:ok, length} @@ -849,9 +849,9 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetGenesisAddress{address: address}, - conflict_resolver, - timeout, - acceptance_resolver + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver ) do {:ok, %GenesisAddress{address: genesis_address}} -> {:ok, genesis_address} @@ -903,7 +903,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetFirstTransactionAddress{address: address}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %NotFound{}} -> {:error, :does_not_exist} @@ -1138,7 +1138,9 @@ defmodule Archethic.TransactionChain do end end - case P2P.quorum_read(nodes, %GetTransactionSummary{address: address}, conflict_resolver) do + case P2P.quorum_read(nodes, %GetTransactionSummary{address: address}, + conflict_resolver: conflict_resolver + ) do {:ok, %TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^address}}} -> true diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 05504aaed4..ce409e6b12 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -30,7 +30,7 @@ defmodule Archethic.P2PTest do assert %Node{ip: {127, 0, 0, 1}} = P2P.get_node_info() end - describe "quorum_read/4" do + describe "quorum_read/3" do setup do pub1 = Crypto.generate_deterministic_keypair("node1") |> elem(0) pub2 = Crypto.generate_deterministic_keypair("node2") |> elem(0) @@ -120,15 +120,17 @@ defmodule Archethic.P2PTest do end) assert {:ok, %Transaction{}} = - P2P.quorum_read(nodes, %GetTransaction{address: ""}, fn results -> - case Enum.find(results, &match?(%Transaction{}, &1)) do - nil -> - %NotFound{} - - tx -> - tx + P2P.quorum_read(nodes, %GetTransaction{address: ""}, + conflict_resolver: fn results -> + case Enum.find(results, &match?(%Transaction{}, &1)) do + nil -> + %NotFound{} + + tx -> + tx + end end - end) + ) end test "should try all nodes and return error when no response match acceptance resolver", @@ -156,9 +158,7 @@ defmodule Archethic.P2PTest do P2P.quorum_read( nodes, %GetTransaction{address: ""}, - fn results -> List.last(results) end, - 0, - fn _ -> false end + acceptance_resolver: fn _ -> false end ) end @@ -175,11 +175,8 @@ defmodule Archethic.P2PTest do P2P.quorum_read( nodes, %GetTransaction{address: ""}, - fn results -> List.first(results) end, - 0, - fn _ -> false end, - 3, - fn all_results -> + acceptance_resolver: fn _ -> false end, + repair_fun: fn all_results -> assert match?([{_, _} | _], all_results) Process.send(me, {:repairing, length(all_results)}, []) Process.sleep(10_000) From e204415c91d896033aca488506d86bb1b0406201 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Mon, 28 Oct 2024 09:24:30 +0100 Subject: [PATCH 5/8] oops --- playground.diff | 168 ------------------------------------------------ 1 file changed, 168 deletions(-) delete mode 100644 playground.diff diff --git a/playground.diff b/playground.diff deleted file mode 100644 index 62684918a9..0000000000 --- a/playground.diff +++ /dev/null @@ -1,168 +0,0 @@ -commit 7de9ac53236b308d3cd964df9914f78b580add97 -Author: bchamagne -Date: Wed May 29 11:33:41 2024 +0200 - - Fix the execution's errors when cache? is false - -diff --git a/lib/archethic/contracts.ex b/lib/archethic/contracts.ex -index da72f355..33f4e1ca 100644 ---- a/lib/archethic/contracts.ex -+++ b/lib/archethic/contracts.ex -@@ -560,26 +560,31 @@ defmodule Archethic.Contracts do - end - - defp cache_interpreter_execute(fun, key, opts) do -- if Keyword.get(opts, :cache?) do -- timeout = Keyword.get(opts, :timeout, 5_000) -- -- func = fn -> -- try do -- fun.() -- rescue -- err -> -- # error from the code (ex: 1 + "abc") -- {:error, err, __STACKTRACE__} -- end -+ func = fn -> -+ try do -+ fun.() -+ rescue -+ err -> -+ # error or throw from the user's code (ex: 1 + "abc") -+ {:error, err, __STACKTRACE__} - end -+ end - -- # We set the maximum timeout for a transaction to be processed before the kill the cache -- case Utils.JobCache.get!(key, function: func, timeout: timeout, ttl: 60_000) do -- {:error, err, stacktrace} -> {:error, raise_to_failure(err, stacktrace)} -- result -> result -+ result = -+ if Keyword.get(opts, :cache?) do -+ # We set the maximum timeout for a transaction to be processed before the kill the cache -+ Utils.JobCache.get!(key, -+ function: func, -+ timeout: Keyword.get(opts, :timeout, 5_000), -+ ttl: 60_000 -+ ) -+ else -+ func.() - end -- else -- fun.() -+ -+ case result do -+ {:error, err, stacktrace} -> {:error, raise_to_failure(err, stacktrace)} -+ result -> result - end - rescue - _ -> -diff --git a/test/archethic/contracts_test.exs b/test/archethic/contracts_test.exs -index 50414cf7..6c128a55 100644 ---- a/test/archethic/contracts_test.exs -+++ b/test/archethic/contracts_test.exs -@@ -293,7 +293,7 @@ defmodule Archethic.ContractsTest do - - trigger_tx = TransactionFactory.create_valid_transaction() - -- assert {:error, %Failure{}} = -+ assert {:error, %Failure{user_friendly_error: "bad argument in arithmetic expression - L3"}} = - Contracts.execute_condition( - {:transaction, nil, nil}, - Contract.from_transaction!(contract_tx), -@@ -302,6 +302,55 @@ defmodule Archethic.ContractsTest do - DateTime.utc_now(), - [] - ) -+ -+ assert {:error, %Failure{user_friendly_error: "bad argument in arithmetic expression - L3"}} = -+ Contracts.execute_condition( -+ {:transaction, nil, nil}, -+ Contract.from_transaction!(contract_tx), -+ trigger_tx, -+ nil, -+ DateTime.utc_now(), -+ [], -+ cache?: false -+ ) -+ end -+ -+ test "should return Error if condition throws" do -+ code = """ -+ @version 1 -+ condition triggered_by: transaction do -+ throw code: 12, message: "oh no" -+ end -+ -+ actions triggered_by: transaction do -+ Contract.set_content "hello" -+ end -+ """ -+ -+ contract_tx = ContractFactory.create_valid_contract_tx(code) -+ -+ trigger_tx = TransactionFactory.create_valid_transaction() -+ -+ assert {:error, %Failure{user_friendly_error: "oh no - L3"}} = -+ Contracts.execute_condition( -+ {:transaction, nil, nil}, -+ Contract.from_transaction!(contract_tx), -+ trigger_tx, -+ nil, -+ DateTime.utc_now(), -+ [] -+ ) -+ -+ assert {:error, %Failure{user_friendly_error: "oh no - L3"}} = -+ Contracts.execute_condition( -+ {:transaction, nil, nil}, -+ Contract.from_transaction!(contract_tx), -+ trigger_tx, -+ nil, -+ DateTime.utc_now(), -+ [], -+ cache?: false -+ ) - end - - test "should be able to use a custom function call as parameter in condition block" do -@@ -603,6 +652,39 @@ defmodule Archethic.ContractsTest do - ) - end - -+ test "should return the proper error when there is a throw" do -+ code = """ -+ @version 1 -+ condition triggered_by: transaction, as: [] -+ actions triggered_by: transaction do -+ throw code: 1, message: "nope" -+ end -+ """ -+ -+ contract_tx = ContractFactory.create_valid_contract_tx(code) -+ -+ incoming_tx = TransactionFactory.create_valid_transaction() -+ -+ assert {:error, %Failure{user_friendly_error: "nope - L4"}} = -+ Contracts.execute_trigger( -+ {:transaction, nil, nil}, -+ Contract.from_transaction!(contract_tx), -+ incoming_tx, -+ nil, -+ [] -+ ) -+ -+ assert {:error, %Failure{user_friendly_error: "nope - L4"}} = -+ Contracts.execute_trigger( -+ {:transaction, nil, nil}, -+ Contract.from_transaction!(contract_tx), -+ incoming_tx, -+ nil, -+ [], -+ cache?: false -+ ) -+ end -+ - test "should fail if the state is too big" do - code = ~S""" - @version 1 From f246c29cbd025340e88871d162350c5682ae3c58 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Mon, 28 Oct 2024 15:22:58 +0100 Subject: [PATCH 6/8] refactor for code quality --- lib/archethic/p2p.ex | 51 +++++++++++++++++++------------------ test/archethic/p2p_test.exs | 33 ++++++++++++++++++++---- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 050e8e5771..8ea7a37b0d 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -764,18 +764,20 @@ defmodule Archethic.P2P do |> Enum.chunk_every(consistency_level) |> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} -> results_by_node = send_message_and_filter_results(nodes, message, timeout) + all_results_acc = all_results_acc ++ results_by_node + {_nodes, results} = Enum.unzip(results_by_node) - with :ok <- enough_results?(previous_result_acc, results_by_node), - result <- resolve_conflicts(previous_result_acc, results_by_node, conflict_resolver), + with {:ok, results} <- enough_results(previous_result_acc, results), + result <- resolve_conflicts(results, conflict_resolver), :ok <- result_accepted?(result, acceptance_resolver) do - {:halt, {result, all_results_acc ++ results_by_node}} + {:halt, {result, all_results_acc}} else {:error, previous_result} -> - {:cont, {previous_result, all_results_acc ++ results_by_node}} + {:cont, {previous_result, all_results_acc}} end end) |> then(fn {result, all_results} -> - Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(all_results) end) + maybe_async_repair(all_results, repair_fun) cond do is_nil(result) -> @@ -800,40 +802,33 @@ defmodule Archethic.P2P do timeout: timeout + 2000 ) |> Stream.filter(&match?({:ok, {_, {:ok, _}}}, &1)) - |> Stream.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end) - |> Enum.to_list() + |> Enum.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end) end # returns ok if we have 2 results # we should probably use the consistency level - defp enough_results?(nil, results_by_node) do - case results_by_node do + defp enough_results(nil, results) do + case results do [] -> {:error, nil} [{_node, result}] -> {:error, result} - _ -> :ok + _ -> {:ok, results} end end - defp enough_results?(previous_result, results_by_node) do - case results_by_node do + defp enough_results(previous_result, results) do + case results do [] -> {:error, previous_result} - _ -> :ok + _ -> {:ok, [previous_result | results]} end end - defp resolve_conflicts(nil, results_by_node, conflict_resolver) do - results_by_node - |> Enum.map(&elem(&1, 1)) + defp resolve_conflicts(results, conflict_resolver) do + results |> Enum.dedup() - |> conflict_resolver.() - end - - defp resolve_conflicts(previous_result, results_by_node, conflict_resolver) do - results_by_node - |> Enum.map(&elem(&1, 1)) - |> Kernel.++([previous_result]) - |> Enum.dedup() - |> conflict_resolver.() + |> then(fn + [result] -> result + results -> conflict_resolver.(results) + end) end defp result_accepted?(result, acceptance_resolver) do @@ -844,6 +839,12 @@ defmodule Archethic.P2P do end end + defp maybe_async_repair(results, repair_fun) do + unless Enum.empty?(results) do + Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(results) end) + end + end + @doc """ Update the node's network patch """ diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index ce409e6b12..1677e0ffff 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -162,7 +162,7 @@ defmodule Archethic.P2PTest do ) end - test "should call the repair function for every results", %{nodes: nodes} do + test "should call the repair function if provided", %{nodes: nodes} do MockClient |> stub(:send_message, fn _, %GetTransaction{}, _timeout -> @@ -176,10 +176,9 @@ defmodule Archethic.P2PTest do nodes, %GetTransaction{address: ""}, acceptance_resolver: fn _ -> false end, - repair_fun: fn all_results -> - assert match?([{_, _} | _], all_results) - Process.send(me, {:repairing, length(all_results)}, []) - Process.sleep(10_000) + repair_fun: fn results_by_node -> + assert match?([{_, _} | _], results_by_node) + send(me, {:repairing, length(results_by_node)}) :ok end ) @@ -187,6 +186,30 @@ defmodule Archethic.P2PTest do expected_size = length(nodes) assert_receive({:repairing, ^expected_size}, 100) end + + test "should call the repair function asynchronously", %{nodes: nodes} do + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:error, :acceptance_failed} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + acceptance_resolver: fn _ -> false end, + repair_fun: fn _ -> + Process.sleep(10_000) + send(me, :repairing_done) + :ok + end + ) + + refute_received(:repairing_done) + end end describe "authorized_and_available_nodes/1" do From f74c1d969b2467184c3495b457312757fdd76a8d Mon Sep 17 00:00:00 2001 From: bchamagne Date: Mon, 28 Oct 2024 17:32:23 +0100 Subject: [PATCH 7/8] provide the accepted_result to the repair_fun --- lib/archethic/p2p.ex | 13 +++++++------ test/archethic/p2p_test.exs | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 8ea7a37b0d..3a8392169e 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -739,7 +739,7 @@ defmodule Archethic.P2P do conflict_resolver :: (list(Message.t()) -> Message.t()), acceptance_resolver :: (Message.t() -> boolean()), consistency_level :: pos_integer(), - repair_fun :: (list(Message.t()) -> :ok) + repair_fun :: (nil | Message.t(), list(Message.t()) -> :ok) """ @spec quorum_read( node_list :: list(Node.t()), @@ -755,7 +755,7 @@ defmodule Archethic.P2P do conflict_resolver = Keyword.get(opts, :conflict_resolver, &List.first(&1)) acceptance_resolver = Keyword.get(opts, :acceptance_resolver, fn _ -> true end) consistency_level = Keyword.get(opts, :consistency_level, 3) - repair_fun = Keyword.get(opts, :repair_fun, fn _ -> :ok end) + repair_fun = Keyword.get(opts, :repair_fun, fn _, _ -> :ok end) nodes |> Enum.filter(&node_connected?/1) @@ -777,16 +777,17 @@ defmodule Archethic.P2P do end end) |> then(fn {result, all_results} -> - maybe_async_repair(all_results, repair_fun) - cond do is_nil(result) -> + maybe_async_repair(nil, all_results, repair_fun) {:error, :network_issue} acceptance_resolver.(result) -> + maybe_async_repair(result, all_results, repair_fun) {:ok, result} true -> + maybe_async_repair(nil, all_results, repair_fun) {:error, :acceptance_failed} end end) @@ -839,9 +840,9 @@ defmodule Archethic.P2P do end end - defp maybe_async_repair(results, repair_fun) do + defp maybe_async_repair(result, results, repair_fun) do unless Enum.empty?(results) do - Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(results) end) + Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(result, results) end) end end diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 1677e0ffff..f05979f554 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -162,7 +162,9 @@ defmodule Archethic.P2PTest do ) end - test "should call the repair function if provided", %{nodes: nodes} do + test "repair function should receive a nil accepted_result when no accepted response", %{ + nodes: nodes + } do MockClient |> stub(:send_message, fn _, %GetTransaction{}, _timeout -> @@ -176,7 +178,8 @@ defmodule Archethic.P2PTest do nodes, %GetTransaction{address: ""}, acceptance_resolver: fn _ -> false end, - repair_fun: fn results_by_node -> + repair_fun: fn accepted_result, results_by_node -> + assert is_nil(accepted_result) assert match?([{_, _} | _], results_by_node) send(me, {:repairing, length(results_by_node)}) :ok @@ -187,6 +190,31 @@ defmodule Archethic.P2PTest do assert_receive({:repairing, ^expected_size}, 100) end + test "repair function should receive the accepted_result", %{nodes: nodes} do + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + repair_fun: fn accepted_result, results_by_node -> + assert %Transaction{} = accepted_result + assert match?([{_, _} | _], results_by_node) + send(me, {:repairing, length(results_by_node)}) + :ok + end + ) + + # 3 is consistency_level + assert_receive({:repairing, 3}, 100) + end + test "should call the repair function asynchronously", %{nodes: nodes} do MockClient |> stub(:send_message, fn From 2b29e5125b66c98e3ba1038abca5d6647f79dc97 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Tue, 29 Oct 2024 14:19:06 +0100 Subject: [PATCH 8/8] more lint after feedback, add a test --- lib/archethic/p2p.ex | 39 ++--- test/archethic/p2p_test.exs | 134 +++++++++--------- .../replication/transaction_context_test.exs | 24 +--- test/support/template.ex | 22 +++ 4 files changed, 106 insertions(+), 113 deletions(-) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 3a8392169e..c8865d42a3 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -764,12 +764,12 @@ defmodule Archethic.P2P do |> Enum.chunk_every(consistency_level) |> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} -> results_by_node = send_message_and_filter_results(nodes, message, timeout) - all_results_acc = all_results_acc ++ results_by_node + all_results_acc = results_by_node ++ all_results_acc {_nodes, results} = Enum.unzip(results_by_node) with {:ok, results} <- enough_results(previous_result_acc, results), result <- resolve_conflicts(results, conflict_resolver), - :ok <- result_accepted?(result, acceptance_resolver) do + :ok <- result_accepted(result, acceptance_resolver) do {:halt, {result, all_results_acc}} else {:error, previous_result} -> @@ -808,31 +808,20 @@ defmodule Archethic.P2P do # returns ok if we have 2 results # we should probably use the consistency level - defp enough_results(nil, results) do - case results do - [] -> {:error, nil} - [{_node, result}] -> {:error, result} - _ -> {:ok, results} - end - end - - defp enough_results(previous_result, results) do - case results do - [] -> {:error, previous_result} - _ -> {:ok, [previous_result | results]} - end - end + defp enough_results(nil, []), do: {:error, nil} + defp enough_results(nil, [result]), do: {:error, result} + defp enough_results(nil, results), do: {:ok, results} + defp enough_results(previous_result, []), do: {:error, previous_result} + defp enough_results(previous_result, results), do: {:ok, [previous_result | results]} defp resolve_conflicts(results, conflict_resolver) do - results - |> Enum.dedup() - |> then(fn + case Enum.uniq(results) do [result] -> result results -> conflict_resolver.(results) - end) + end end - defp result_accepted?(result, acceptance_resolver) do + defp result_accepted(result, acceptance_resolver) do if acceptance_resolver.(result) do :ok else @@ -840,10 +829,12 @@ defmodule Archethic.P2P do end end + defp maybe_async_repair(_result, [], _repair_fun), do: :ok + defp maybe_async_repair(result, results, repair_fun) do - unless Enum.empty?(results) do - Task.Supervisor.start_child(TaskSupervisor, fn -> repair_fun.(result, results) end) - end + Task.Supervisor.start_child(TaskSupervisor, fn -> + repair_fun.(result, results) + end) end @doc """ diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index f05979f554..efd5341845 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -1,5 +1,6 @@ defmodule Archethic.P2PTest do use ArchethicCase + import ArchethicCase alias Archethic.Crypto @@ -31,66 +32,9 @@ defmodule Archethic.P2PTest do end describe "quorum_read/3" do - setup do - pub1 = Crypto.generate_deterministic_keypair("node1") |> elem(0) - pub2 = Crypto.generate_deterministic_keypair("node2") |> elem(0) - pub3 = Crypto.generate_deterministic_keypair("node3") |> elem(0) - pub4 = Crypto.generate_deterministic_keypair("node4") |> elem(0) - pub5 = Crypto.generate_deterministic_keypair("node5") |> elem(0) - - nodes = [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3002, - first_public_key: pub1, - last_public_key: pub1, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3003, - first_public_key: pub2, - last_public_key: pub2, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3004, - first_public_key: pub3, - last_public_key: pub3, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3005, - first_public_key: pub4, - last_public_key: pub4, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3006, - first_public_key: pub5, - last_public_key: pub5, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - } - ] - - Enum.each(nodes, &P2P.add_and_connect_node/1) - {:ok, %{nodes: nodes}} - end + test "should return the first result when the same results are returned" do + nodes = add_and_connect_nodes(5) - test "should return the first result when the same results are returned", %{nodes: nodes} do MockClient |> expect( :send_message, @@ -103,7 +47,9 @@ defmodule Archethic.P2PTest do assert {:ok, %Transaction{}} = P2P.quorum_read(nodes, %GetTransaction{address: ""}) end - test "should run resolver conflicts when the results are different", %{nodes: nodes} do + test "should run resolver conflicts when the results are different" do + nodes = add_and_connect_nodes(5) + MockClient |> stub(:send_message, fn %Node{port: 3004}, %GetTransaction{}, _timeout -> @@ -133,10 +79,9 @@ defmodule Archethic.P2PTest do ) end - test "should try all nodes and return error when no response match acceptance resolver", - %{ - nodes: nodes - } do + test "should try all nodes and return error when no response match acceptance resolver" do + nodes = add_and_connect_nodes(5) + MockClient |> expect( :send_message, @@ -162,9 +107,56 @@ defmodule Archethic.P2PTest do ) end - test "repair function should receive a nil accepted_result when no accepted response", %{ - nodes: nodes - } do + test "should accept a single result for the entire set" do + nodes = [node | _] = add_and_connect_nodes(5) + + MockClient + |> stub(:send_message, fn + ^node, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + + _, %GetTransaction{}, _timeout -> + {:error, :timeout} + end) + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""} + ) + end + + test "should not accept a single result if not gone through the entire set" do + nodes = [node, _, _, _, node5] = add_and_connect_nodes(5) + + me = self() + + MockClient + |> stub(:send_message, fn + ^node, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + + ^node5, %GetTransaction{}, _timeout -> + send(me, :node5_requested) + + {:ok, %NotFound{}} + + _, %GetTransaction{}, _timeout -> + {:error, :timeout} + end) + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""} + ) + + assert_receive :node5_requested, 100 + end + + test "repair function should receive a nil accepted_result when no accepted response" do + nodes = add_and_connect_nodes(5) + MockClient |> stub(:send_message, fn _, %GetTransaction{}, _timeout -> @@ -190,7 +182,9 @@ defmodule Archethic.P2PTest do assert_receive({:repairing, ^expected_size}, 100) end - test "repair function should receive the accepted_result", %{nodes: nodes} do + test "repair function should receive the accepted_result" do + nodes = add_and_connect_nodes(5) + MockClient |> stub(:send_message, fn _, %GetTransaction{}, _timeout -> @@ -215,7 +209,9 @@ defmodule Archethic.P2PTest do assert_receive({:repairing, 3}, 100) end - test "should call the repair function asynchronously", %{nodes: nodes} do + test "should call the repair function asynchronously" do + nodes = add_and_connect_nodes(5) + MockClient |> stub(:send_message, fn _, %GetTransaction{}, _timeout -> diff --git a/test/archethic/replication/transaction_context_test.exs b/test/archethic/replication/transaction_context_test.exs index 6e0a89b7a0..393adbf5c4 100644 --- a/test/archethic/replication/transaction_context_test.exs +++ b/test/archethic/replication/transaction_context_test.exs @@ -34,7 +34,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %Transaction{address: address}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) assert %Transaction{} = TransactionContext.fetch_transaction(address) end @@ -48,7 +48,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %NotFound{}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) # no assert, we use expect(5) in the mock TransactionContext.fetch_transaction(address, acceptance_resolver: :accept_transaction) @@ -65,7 +65,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %GenesisAddress{address: genesis, timestamp: DateTime.utc_now()}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) assert genesis == TransactionContext.fetch_genesis_address(address) end @@ -79,7 +79,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %GenesisAddress{address: address, timestamp: DateTime.utc_now()}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) # no assert, we use expect(5) in the mock TransactionContext.fetch_genesis_address(address, @@ -244,20 +244,4 @@ defmodule Archethic.Replication.TransactionContextTest do assert [^v_utxo] = TransactionContext.fetch_transaction_unspent_outputs(genesis_address) end - - def connect_to_n_nodes(n) do - Enum.each(1..n, fn i -> - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000 + i, - first_public_key: random_public_key(), - last_public_key: random_public_key(), - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - }) - end) - end end diff --git a/test/support/template.ex b/test/support/template.ex index ee66b86a3f..a7d0a5bf69 100644 --- a/test/support/template.ex +++ b/test/support/template.ex @@ -19,6 +19,9 @@ defmodule ArchethicCase do alias Archethic.Governance.Pools.MemTable, as: PoolsMemTable alias Archethic.OracleChain.MemTable, as: OracleMemTable + + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.P2P.MemTable, as: P2PMemTable alias Archethic.ContractFactory @@ -279,6 +282,25 @@ defmodule ArchethicCase do end) end + def add_and_connect_nodes(n) do + Enum.map(1..n, fn i -> + node = %Node{ + ip: {127, 0, 0, 1}, + port: 3000 + i, + first_public_key: random_public_key(), + last_public_key: random_public_key(), + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now() + } + + P2P.add_and_connect_node(node) + node + end) + end + def random_address() do <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> end