diff --git a/lib/archethic/oracle_chain/scheduler.ex b/lib/archethic/oracle_chain/scheduler.ex index b807088a8..8842ad36b 100644 --- a/lib/archethic/oracle_chain/scheduler.ex +++ b/lib/archethic/oracle_chain/scheduler.ex @@ -178,12 +178,12 @@ defmodule Archethic.OracleChain.Scheduler do PubSub.unregister_to_new_transaction_by_address(address) new_data = - case Map.pop(data, :oracle_watcher) do + case Map.pop(data, :watcher) do {nil, data} -> data {pid, data} -> - Process.exit(pid, :normal) + Process.exit(pid, :kill) data end @@ -228,54 +228,33 @@ defmodule Archethic.OracleChain.Scheduler do {:keep_state, new_data, {:next_event, :internal, :schedule}} end - def handle_event(:info, {:new_transaction, _, :oracle_summary, _timestamp}, :triggered, _data) do - OracleChain.update_summ_gen_addr() - {:keep_state_and_data, {:next_event, :internal, :fetch_data}} + def handle_event(:info, {:new_transaction, _, :oracle_summary, _timestamp}, :triggered, data) do + new_data = + case Map.pop(data, :watcher) do + {nil, data} -> + data + + {pid, data} -> + Process.exit(pid, :kill) + data + end + + new_data = update_summary_date(new_data) + {:keep_state, new_data, {:next_event, :internal, :fetch_data}} end def handle_event( :info, {:new_transaction, _address, :oracle_summary, _timestamp}, :scheduled, - data = %{summary_interval: summary_interval} + data ) do Logger.debug( "Reschedule polling after reception of an oracle summary transaction in scheduled state instead of triggered state" ) - current_time = DateTime.utc_now() |> DateTime.truncate(:second) - next_summary_date = next_date(summary_interval, current_time) - Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}") - - # We reset the summary date and indexes before rescheduling - new_data = - data - |> Map.put(:summary_date, next_summary_date) - |> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1)) - |> Map.delete(:oracle_watcher) - |> Map.update!(:indexes, fn indexes -> - # Clean previous indexes - indexes - |> Map.keys() - |> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0)) - |> Enum.reduce(indexes, &Map.delete(&2, &1)) - end) - |> Map.update!(:indexes, fn indexes -> - # Prevent overwrite, if the oracle transaction was faster than the summary processing - if Map.has_key?(indexes, next_summary_date) do - indexes - else - Map.put(indexes, next_summary_date, 0) - end - end) - - OracleChain.update_summ_gen_addr() - - {:next_state, :triggered, new_data, - [ - {:next_event, :internal, :fetch_data}, - {:next_event, :internal, :schedule} - ]} + new_data = update_summary_date(data) + {:next_state, :triggered, new_data, {:next_event, :internal, :fetch_data}} end def handle_event( @@ -289,7 +268,7 @@ defmodule Archethic.OracleChain.Scheduler do ) do Logger.debug("Oracle polling in process") - if DateTime.diff(polling_date, summary_date, :second) == 0 do + if DateTime.diff(polling_date, summary_date, :second) >= 0 do {:next_state, :triggered, data, {:next_event, :internal, :aggregate}} else {:next_state, :triggered, data, {:next_event, :internal, :fetch_data}} @@ -350,7 +329,7 @@ defmodule Archethic.OracleChain.Scheduler do end end) - {:keep_state, Map.put(data, :oracle_watcher, pid)} + {:keep_state, Map.put(data, :watcher, pid)} {:exists, true} -> Logger.warning("Transaction already exists - before sending", @@ -368,7 +347,7 @@ defmodule Archethic.OracleChain.Scheduler do :internal, :aggregate, :triggered, - data = %{summary_date: summary_date, summary_interval: summary_interval, indexes: indexes} + data = %{summary_date: summary_date, indexes: indexes} ) when is_map_key(indexes, summary_date) do Logger.debug("Oracle summary - state: #{inspect(data)}") @@ -376,22 +355,13 @@ defmodule Archethic.OracleChain.Scheduler do index = Map.fetch!(indexes, summary_date) validation_nodes = get_validation_nodes(summary_date, index + 1) - # Stop previous oracle retries when the summary is triggered - case Map.get(data, :oracle_watcher) do - nil -> - :ignore - - pid -> - Process.exit(pid, :normal) - end - tx_address = summary_date |> Crypto.derive_oracle_keypair(index + 1) |> elem(0) |> Crypto.derive_address() - summary_watcher_pid = + watcher_pid = with {:exists, false} <- {:exists, DB.transaction_exists?(tx_address)}, {:trigger, true} <- {:trigger, trigger_node?(validation_nodes)} do Logger.debug("Oracle transaction summary sending", @@ -431,33 +401,7 @@ defmodule Archethic.OracleChain.Scheduler do nil end - current_time = DateTime.utc_now() |> DateTime.truncate(:second) - next_summary_date = next_date(summary_interval, current_time) - Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}") - - new_data = - data - |> Map.put(:summary_date, next_summary_date) - |> Map.put(:summary_watcher, summary_watcher_pid) - |> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1)) - |> Map.delete(:oracle_watcher) - |> Map.update!(:indexes, fn indexes -> - # Clean previous indexes - indexes - |> Map.keys() - |> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0)) - |> Enum.reduce(indexes, &Map.delete(&2, &1)) - end) - |> Map.update!(:indexes, fn indexes -> - # Prevent overwrite, if the oracle transaction was faster than the summary processing - if Map.has_key?(indexes, next_summary_date) do - indexes - else - Map.put(indexes, next_summary_date, 0) - end - end) - - {:keep_state, new_data} + {:keep_state, Map.put(data, :watcher, watcher_pid)} end def handle_event( @@ -486,64 +430,29 @@ defmodule Archethic.OracleChain.Scheduler do :info, {:EXIT, pid, {:shutdown, :hard_timeout}}, :triggered, - data = %{oracle_watcher: watcher_pid} + data = %{watcher: watcher_pid} ) when pid == watcher_pid do - {:keep_state, Map.delete(data, :oracle_watcher), {:next_event, :internal, :schedule}} - end - - def handle_event( - :info, - {:EXIT, pid, _}, - :triggered, - _data = %{oracle_watcher: watcher_pid} - ) - when pid == watcher_pid do - :keep_state_and_data - end - - def handle_event( - :info, - {:EXIT, pid, _}, - :triggered, - _data = %{summary_watcher: watcher_pid} - ) - when pid == watcher_pid do - :keep_state_and_data - end - - def handle_event( - :info, - {:EXIT, pid, _}, - :scheduled, - _data = %{oracle_watcher: watcher_pid} - ) - when pid == watcher_pid do - :keep_state_and_data + {:keep_state, Map.delete(data, :watcher), {:next_event, :internal, :schedule}} end def handle_event( :info, {:EXIT, pid, _}, _state, - data = %{summary_watcher: watcher_pid} + data = %{watcher: watcher_pid} ) - when pid == watcher_pid do - {:keep_state, Map.delete(data, :summary_watcher)} + when watcher_pid == pid do + {:keep_state, Map.delete(data, :watcher)} end def handle_event( :info, {:EXIT, _pid, _}, _state, - data + _data ) do - new_data = - data - |> Map.delete(:oracle_watcher) - |> Map.delete(:summary_watcher) - - {:keep_state, new_data} + :keep_state_and_data end def handle_event( @@ -675,6 +584,34 @@ defmodule Archethic.OracleChain.Scheduler do def handle_event(_event_type, _event, :idle, _data), do: :keep_state_and_data + defp update_summary_date(data = %{summary_interval: summary_interval}) do + OracleChain.update_summ_gen_addr() + + current_time = DateTime.utc_now() |> DateTime.truncate(:second) + next_summary_date = next_date(summary_interval, current_time) + Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}") + + data + |> Map.put(:summary_date, next_summary_date) + |> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1)) + |> Map.delete(:watcher) + |> Map.update!(:indexes, fn indexes -> + # Clean previous indexes + indexes + |> Map.keys() + |> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0)) + |> Enum.reduce(indexes, &Map.delete(&2, &1)) + end) + |> Map.update!(:indexes, fn indexes -> + # Prevent overwrite, if the oracle transaction was faster than the summary processing + if Map.has_key?(indexes, next_summary_date) do + indexes + else + Map.put(indexes, next_summary_date, 0) + end + end) + end + defp schedule_new_polling(next_polling_date, current_time = %DateTime{}) do Logger.info("Next oracle polling at #{DateTime.to_string(next_polling_date)}") diff --git a/lib/archethic/oracle_chain/services/uco_price/providers/coingecko.ex b/lib/archethic/oracle_chain/services/uco_price/providers/coingecko.ex index c5703ccbc..974181455 100644 --- a/lib/archethic/oracle_chain/services/uco_price/providers/coingecko.ex +++ b/lib/archethic/oracle_chain/services/uco_price/providers/coingecko.ex @@ -25,7 +25,9 @@ defmodule Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko do customize_hostname_check: [ match_fun: :public_key.pkix_verify_hostname_match_fun(:https) ] - ] + ], + connect_timeout: 1000, + timeout: 2000 ] with {:ok, {{_, 200, 'OK'}, _headers, body}} <- diff --git a/lib/archethic/reward/scheduler.ex b/lib/archethic/reward/scheduler.ex index 4bc50930b..97bc52ae8 100644 --- a/lib/archethic/reward/scheduler.ex +++ b/lib/archethic/reward/scheduler.ex @@ -183,7 +183,7 @@ defmodule Archethic.Reward.Scheduler do data {pid, data} -> - Process.exit(pid, :normal) + Process.exit(pid, :kill) data end @@ -215,22 +215,49 @@ defmodule Archethic.Reward.Scheduler do :info, {:new_transaction, address, :mint_rewards, _timestamp}, :scheduled, - data = %{next_address: next_address} + data = %{next_address: next_address, index: index} ) do Logger.debug( "Reschedule rewards after reception of mint rewards transaction in scheduled state instead of triggered state" ) # We prevent non scheduled transactions to change - new_data = + next_index = if next_address == address do - data - |> Map.update!(:index, &(&1 + 1)) + index + 1 else - data + index end - {:keep_state, new_data, {:next_event, :internal, :schedule}} + next_address = Reward.next_address(next_index) + + new_data = + data + |> Map.put(:index, next_index) + |> Map.put(:next_address, next_address) + + validation_nodes = Election.storage_nodes(next_address, P2P.authorized_and_available_nodes()) + + if trigger_node?(validation_nodes) do + Logger.debug("Initialize node rewards tx after mint rewards") + send_node_rewards(next_index) + {:next_state, :triggered, new_data} + else + Logger.debug("Start node responsivness for node rewards tx after mint rewards replication") + + {:ok, pid} = + DetectNodeResponsiveness.start_link(next_address, length(validation_nodes), fn count -> + if trigger_node?(validation_nodes, count) do + Logger.debug("Node reward creation...attempt #{count}", + transaction_address: Base.encode16(next_address) + ) + + send_node_rewards(next_index) + end + end) + + {:next_state, :triggered, Map.put(new_data, :watcher, pid)} + end end def handle_event( @@ -246,7 +273,7 @@ defmodule Archethic.Reward.Scheduler do data {pid, data} -> - Process.exit(pid, :normal) + Process.exit(pid, :kill) data end @@ -291,17 +318,17 @@ defmodule Archethic.Reward.Scheduler do def handle_event( :info, {:EXIT, pid, _}, - :triggered, - _data = %{watcher: watcher_pid} + _state, + data = %{watcher: watcher_pid} ) when watcher_pid == pid do - :keep_state_and_data + {:keep_state, Map.delete(data, :watcher)} end def handle_event( :info, {:EXIT, _pid, _}, - :scheduled, + _state, _data ) do :keep_state_and_data diff --git a/lib/archethic/shared_secrets/node_renewal_scheduler.ex b/lib/archethic/shared_secrets/node_renewal_scheduler.ex index 730648d2a..f41e88d2b 100644 --- a/lib/archethic/shared_secrets/node_renewal_scheduler.ex +++ b/lib/archethic/shared_secrets/node_renewal_scheduler.ex @@ -219,7 +219,7 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do end end) - {:keep_state, Map.put(data, :node_detection, pid)} + {:keep_state, Map.put(data, :watcher, pid)} end end @@ -227,26 +227,26 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do :info, {:EXIT, pid, {:shutdown, :hard_timeout}}, :triggered, - data = %{node_detection: watcher_pid} + data = %{watcher: watcher_pid} ) when pid == watcher_pid do - {:keep_state, Map.delete(data, :node_detection), {:next_event, :internal, :schedule}} + {:keep_state, Map.delete(data, :watcher), {:next_event, :internal, :schedule}} end def handle_event( :info, {:EXIT, pid, _}, - :triggered, - _data = %{node_detection: watcher_pid} + _state, + data = %{watcher: watcher_pid} ) - when pid == watcher_pid do - :keep_state_and_data + when watcher_pid == pid do + {:keep_state, Map.delete(data, :watcher)} end def handle_event( :info, - {:EXIT, _pid, _reason}, - :scheduled, + {:EXIT, _pid, _}, + _state, _data ) do :keep_state_and_data @@ -265,7 +265,7 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do data {pid, data} -> - Process.exit(pid, :normal) + Process.exit(pid, :kill) data end diff --git a/test/archethic/reward/scheduler_test.exs b/test/archethic/reward/scheduler_test.exs index e0c28dff1..86a5b67f9 100644 --- a/test/archethic/reward/scheduler_test.exs +++ b/test/archethic/reward/scheduler_test.exs @@ -6,7 +6,6 @@ defmodule Archethic.Reward.SchedulerTest do P2P, P2P.Node, P2P.Message.StartMining, - P2P.Message.Ok, Reward.Scheduler, TransactionChain.Transaction } @@ -15,6 +14,16 @@ defmodule Archethic.Reward.SchedulerTest do describe "Trigger mint Reward" do test "should initiate the reward scheduler and trigger mint reward" do + P2P.add_and_connect_node(%Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now(), + average_availability: 1.0 + }) + MockDB |> stub(:get_latest_burned_fees, fn -> 0 end) @@ -22,10 +31,6 @@ defmodule Archethic.Reward.SchedulerTest do assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) - send(pid, :node_up) - - assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) - send( pid, {:node_update, @@ -40,14 +45,15 @@ defmodule Archethic.Reward.SchedulerTest do :erlang.trace(pid, true, [:receive]) - assert_receive {:trace, ^pid, :receive, :mint_rewards}, 3_000 + assert_receive {:trace, ^pid, :receive, :mint_rewards}, 1200 + Process.exit(pid, :kill) end end describe "Scheduler" do setup do P2P.add_and_connect_node(%Node{ - first_public_key: Crypto.last_node_public_key(), + first_public_key: Crypto.first_node_public_key(), last_public_key: Crypto.last_node_public_key(), geo_patch: "AAA", available?: true, @@ -56,17 +62,6 @@ defmodule Archethic.Reward.SchedulerTest do average_availability: 1.0 }) - create_p2p_context() - - MockClient - |> expect(:send_message, fn - _, %StartMining{}, _ -> - {:ok, %Ok{}} - - _, _, _ -> - {:ok, %Ok{}} - end) - :ok end @@ -82,16 +77,14 @@ defmodule Archethic.Reward.SchedulerTest do MockClient |> stub(:send_message, fn - _, %StartMining{transaction: %Transaction{address: address, type: :mint_rewards}}, _ -> - send(pid, {:new_transaction, address, :mint_rewards, DateTime.utc_now()}) - send(me, :mint_rewards) - - _, %StartMining{transaction: %{type: :node_rewards}}, _ -> - send(me, :node_rewards) + _, %StartMining{transaction: %Transaction{address: address, type: type}}, _ -> + send(pid, {:new_transaction, address, type, DateTime.utc_now()}) + send(me, type) end) assert_receive :mint_rewards, 1_500 assert_receive :node_rewards, 1_500 + Process.exit(pid, :kill) end test "should not send transaction when burning fees = 0 and should send node rewards" do @@ -100,17 +93,20 @@ defmodule Archethic.Reward.SchedulerTest do me = self() + assert {:ok, pid} = Scheduler.start_link([interval: "*/1 * * * * *"], []) + MockClient - |> stub(:send_message, fn _, %StartMining{transaction: %{type: type}}, _ -> - send(me, type) + |> stub(:send_message, fn + _, %StartMining{transaction: %Transaction{address: address, type: type}}, _ -> + send(pid, {:new_transaction, address, type, DateTime.utc_now()}) + send(me, type) end) - assert {:ok, pid} = Scheduler.start_link([interval: "*/1 * * * * *"], []) - send(pid, :node_up) refute_receive :mint_rewards, 1_200 assert_receive :node_rewards, 1_500 + Process.exit(pid, :kill) end end @@ -198,56 +194,5 @@ defmodule Archethic.Reward.SchedulerTest do :persistent_term.put(:archethic_up, nil) end - - defp create_p2p_context do - pb_key1 = Crypto.derive_keypair("key11", 0) |> elem(0) - pb_key3 = Crypto.derive_keypair("key33", 0) |> elem(0) - - welcome_node = %Node{ - first_public_key: pb_key1, - last_public_key: pb_key1, - available?: true, - geo_patch: "BBB", - network_patch: "BBB", - authorized?: true, - reward_address: Crypto.derive_address(pb_key1), - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - enrollment_date: DateTime.utc_now() - } - - coordinator_node = %Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - authorized?: true, - available?: true, - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - geo_patch: "AAA", - network_patch: "AAA", - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - enrollment_date: DateTime.utc_now() - } - - storage_nodes = [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - http_port: 4000, - first_public_key: pb_key3, - last_public_key: pb_key3, - geo_patch: "BBB", - network_patch: "BBB", - reward_address: Crypto.derive_address(pb_key3), - available?: true, - authorized?: true, - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - enrollment_date: DateTime.utc_now() - } - ] - - Enum.each(storage_nodes, &P2P.add_and_connect_node(&1)) - - P2P.add_and_connect_node(welcome_node) - P2P.add_and_connect_node(coordinator_node) - end end end