Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler bugs #630

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 58 additions & 121 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ defmodule Archethic.OracleChain.Scheduler do
PubSub.unregister_to_new_transaction_by_address(address)

new_data =
case Map.pop(data, :oracle_watcher) do
case Map.pop(data, :watcher) do
{nil, data} ->
data

{pid, data} ->
Process.exit(pid, :normal)
Process.exit(pid, :kill)
data
end

Expand Down Expand Up @@ -228,54 +228,33 @@ defmodule Archethic.OracleChain.Scheduler do
{:keep_state, new_data, {:next_event, :internal, :schedule}}
end

def handle_event(:info, {:new_transaction, _, :oracle_summary, _timestamp}, :triggered, _data) do
OracleChain.update_summ_gen_addr()
{:keep_state_and_data, {:next_event, :internal, :fetch_data}}
def handle_event(:info, {:new_transaction, _, :oracle_summary, _timestamp}, :triggered, data) do
new_data =
case Map.pop(data, :watcher) do
{nil, data} ->
data

{pid, data} ->
Process.exit(pid, :kill)
data
end

new_data = update_summary_date(new_data)
{:keep_state, new_data, {:next_event, :internal, :fetch_data}}
end

def handle_event(
:info,
{:new_transaction, _address, :oracle_summary, _timestamp},
:scheduled,
data = %{summary_interval: summary_interval}
data
) do
Logger.debug(
"Reschedule polling after reception of an oracle summary transaction in scheduled state instead of triggered state"
)

current_time = DateTime.utc_now() |> DateTime.truncate(:second)
next_summary_date = next_date(summary_interval, current_time)
Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}")

# We reset the summary date and indexes before rescheduling
new_data =
data
|> Map.put(:summary_date, next_summary_date)
|> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1))
|> Map.delete(:oracle_watcher)
|> Map.update!(:indexes, fn indexes ->
# Clean previous indexes
indexes
|> Map.keys()
|> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0))
|> Enum.reduce(indexes, &Map.delete(&2, &1))
end)
|> Map.update!(:indexes, fn indexes ->
# Prevent overwrite, if the oracle transaction was faster than the summary processing
if Map.has_key?(indexes, next_summary_date) do
indexes
else
Map.put(indexes, next_summary_date, 0)
end
end)

OracleChain.update_summ_gen_addr()

{:next_state, :triggered, new_data,
[
{:next_event, :internal, :fetch_data},
{:next_event, :internal, :schedule}
]}
new_data = update_summary_date(data)
{:next_state, :triggered, new_data, {:next_event, :internal, :fetch_data}}
end

def handle_event(
Expand All @@ -289,7 +268,7 @@ defmodule Archethic.OracleChain.Scheduler do
) do
Logger.debug("Oracle polling in process")

if DateTime.diff(polling_date, summary_date, :second) == 0 do
if DateTime.diff(polling_date, summary_date, :second) >= 0 do
{:next_state, :triggered, data, {:next_event, :internal, :aggregate}}
else
{:next_state, :triggered, data, {:next_event, :internal, :fetch_data}}
Expand Down Expand Up @@ -350,7 +329,7 @@ defmodule Archethic.OracleChain.Scheduler do
end
end)

{:keep_state, Map.put(data, :oracle_watcher, pid)}
{:keep_state, Map.put(data, :watcher, pid)}

{:exists, true} ->
Logger.warning("Transaction already exists - before sending",
Expand All @@ -368,30 +347,21 @@ defmodule Archethic.OracleChain.Scheduler do
:internal,
:aggregate,
:triggered,
data = %{summary_date: summary_date, summary_interval: summary_interval, indexes: indexes}
data = %{summary_date: summary_date, indexes: indexes}
)
when is_map_key(indexes, summary_date) do
Logger.debug("Oracle summary - state: #{inspect(data)}")

index = Map.fetch!(indexes, summary_date)
validation_nodes = get_validation_nodes(summary_date, index + 1)

# Stop previous oracle retries when the summary is triggered
case Map.get(data, :oracle_watcher) do
nil ->
:ignore

pid ->
Process.exit(pid, :normal)
end

tx_address =
summary_date
|> Crypto.derive_oracle_keypair(index + 1)
|> elem(0)
|> Crypto.derive_address()

summary_watcher_pid =
watcher_pid =
with {:exists, false} <- {:exists, DB.transaction_exists?(tx_address)},
{:trigger, true} <- {:trigger, trigger_node?(validation_nodes)} do
Logger.debug("Oracle transaction summary sending",
Expand Down Expand Up @@ -431,33 +401,7 @@ defmodule Archethic.OracleChain.Scheduler do
nil
end

current_time = DateTime.utc_now() |> DateTime.truncate(:second)
next_summary_date = next_date(summary_interval, current_time)
Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}")

new_data =
data
|> Map.put(:summary_date, next_summary_date)
|> Map.put(:summary_watcher, summary_watcher_pid)
|> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1))
|> Map.delete(:oracle_watcher)
|> Map.update!(:indexes, fn indexes ->
# Clean previous indexes
indexes
|> Map.keys()
|> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0))
|> Enum.reduce(indexes, &Map.delete(&2, &1))
end)
|> Map.update!(:indexes, fn indexes ->
# Prevent overwrite, if the oracle transaction was faster than the summary processing
if Map.has_key?(indexes, next_summary_date) do
indexes
else
Map.put(indexes, next_summary_date, 0)
end
end)

{:keep_state, new_data}
{:keep_state, Map.put(data, :watcher, watcher_pid)}
end

def handle_event(
Expand Down Expand Up @@ -486,64 +430,29 @@ defmodule Archethic.OracleChain.Scheduler do
:info,
{:EXIT, pid, {:shutdown, :hard_timeout}},
:triggered,
data = %{oracle_watcher: watcher_pid}
data = %{watcher: watcher_pid}
)
when pid == watcher_pid do
{:keep_state, Map.delete(data, :oracle_watcher), {:next_event, :internal, :schedule}}
end

def handle_event(
:info,
{:EXIT, pid, _},
:triggered,
_data = %{oracle_watcher: watcher_pid}
)
when pid == watcher_pid do
:keep_state_and_data
end

def handle_event(
:info,
{:EXIT, pid, _},
:triggered,
_data = %{summary_watcher: watcher_pid}
)
when pid == watcher_pid do
:keep_state_and_data
end

def handle_event(
:info,
{:EXIT, pid, _},
:scheduled,
_data = %{oracle_watcher: watcher_pid}
)
when pid == watcher_pid do
:keep_state_and_data
{:keep_state, Map.delete(data, :watcher), {:next_event, :internal, :schedule}}
end

def handle_event(
:info,
{:EXIT, pid, _},
_state,
data = %{summary_watcher: watcher_pid}
data = %{watcher: watcher_pid}
)
when pid == watcher_pid do
{:keep_state, Map.delete(data, :summary_watcher)}
when watcher_pid == pid do
{:keep_state, Map.delete(data, :watcher)}
end

def handle_event(
:info,
{:EXIT, _pid, _},
_state,
data
_data
) do
new_data =
data
|> Map.delete(:oracle_watcher)
|> Map.delete(:summary_watcher)

{:keep_state, new_data}
:keep_state_and_data
end

def handle_event(
Expand Down Expand Up @@ -675,6 +584,34 @@ defmodule Archethic.OracleChain.Scheduler do

def handle_event(_event_type, _event, :idle, _data), do: :keep_state_and_data

defp update_summary_date(data = %{summary_interval: summary_interval}) do
OracleChain.update_summ_gen_addr()

current_time = DateTime.utc_now() |> DateTime.truncate(:second)
next_summary_date = next_date(summary_interval, current_time)
Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}")

data
|> Map.put(:summary_date, next_summary_date)
|> Map.put(:next_address, Crypto.derive_oracle_address(next_summary_date, 1))
|> Map.delete(:watcher)
|> Map.update!(:indexes, fn indexes ->
# Clean previous indexes
indexes
|> Map.keys()
|> Enum.filter(&(DateTime.diff(&1, next_summary_date) < 0))
|> Enum.reduce(indexes, &Map.delete(&2, &1))
end)
|> Map.update!(:indexes, fn indexes ->
# Prevent overwrite, if the oracle transaction was faster than the summary processing
if Map.has_key?(indexes, next_summary_date) do
indexes
else
Map.put(indexes, next_summary_date, 0)
end
end)
end

defp schedule_new_polling(next_polling_date, current_time = %DateTime{}) do
Logger.info("Next oracle polling at #{DateTime.to_string(next_polling_date)}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ defmodule Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko do
customize_hostname_check: [
match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
]
]
],
connect_timeout: 1000,
timeout: 2000
]

with {:ok, {{_, 200, 'OK'}, _headers, body}} <-
Expand Down
51 changes: 39 additions & 12 deletions lib/archethic/reward/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ defmodule Archethic.Reward.Scheduler do
data

{pid, data} ->
Process.exit(pid, :normal)
Process.exit(pid, :kill)
data
end

Expand Down Expand Up @@ -215,22 +215,49 @@ defmodule Archethic.Reward.Scheduler do
:info,
{:new_transaction, address, :mint_rewards, _timestamp},
:scheduled,
data = %{next_address: next_address}
data = %{next_address: next_address, index: index}
) do
Logger.debug(
"Reschedule rewards after reception of mint rewards transaction in scheduled state instead of triggered state"
)

# We prevent non scheduled transactions to change
new_data =
next_index =
if next_address == address do
data
|> Map.update!(:index, &(&1 + 1))
index + 1
else
data
index
end

{:keep_state, new_data, {:next_event, :internal, :schedule}}
next_address = Reward.next_address(next_index)

new_data =
data
|> Map.put(:index, next_index)
|> Map.put(:next_address, next_address)

validation_nodes = Election.storage_nodes(next_address, P2P.authorized_and_available_nodes())

if trigger_node?(validation_nodes) do
Logger.debug("Initialize node rewards tx after mint rewards")
send_node_rewards(next_index)
{:next_state, :triggered, new_data}
else
Logger.debug("Start node responsivness for node rewards tx after mint rewards replication")

{:ok, pid} =
DetectNodeResponsiveness.start_link(next_address, length(validation_nodes), fn count ->
if trigger_node?(validation_nodes, count) do
Logger.debug("Node reward creation...attempt #{count}",
transaction_address: Base.encode16(next_address)
)

send_node_rewards(next_index)
end
end)

{:next_state, :triggered, Map.put(new_data, :watcher, pid)}
end
end

def handle_event(
Expand All @@ -246,7 +273,7 @@ defmodule Archethic.Reward.Scheduler do
data

{pid, data} ->
Process.exit(pid, :normal)
Process.exit(pid, :kill)
data
end

Expand Down Expand Up @@ -291,17 +318,17 @@ defmodule Archethic.Reward.Scheduler do
def handle_event(
:info,
{:EXIT, pid, _},
:triggered,
_data = %{watcher: watcher_pid}
_state,
data = %{watcher: watcher_pid}
)
when watcher_pid == pid do
:keep_state_and_data
{:keep_state, Map.delete(data, :watcher)}
end

def handle_event(
:info,
{:EXIT, _pid, _},
:scheduled,
_state,
_data
) do
:keep_state_and_data
Expand Down
Loading