diff --git a/lib/archethic/beacon_chain/summary_timer.ex b/lib/archethic/beacon_chain/summary_timer.ex index bccba1a11..3888dae83 100644 --- a/lib/archethic/beacon_chain/summary_timer.ex +++ b/lib/archethic/beacon_chain/summary_timer.ex @@ -20,14 +20,22 @@ defmodule ArchEthic.BeaconChain.SummaryTimer do end @doc """ - Give the next beacon chain slot using the `SlotTimer` interval + Give the next beacon chain slot using the `SummaryTimer` interval """ - def next_summary(date_from = %DateTime{}) do - get_interval() - |> CronParser.parse!(true) - |> CronScheduler.get_next_run_date!(DateTime.to_naive(date_from)) - |> DateTime.from_naive!("Etc/UTC") + cron_expression = CronParser.parse!(get_interval(), true) + naive_date_from = DateTime.to_naive(date_from) + + if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do + cron_expression + |> CronScheduler.get_next_run_dates(naive_date_from) + |> Enum.at(1) + |> DateTime.from_naive!("Etc/UTC") + else + cron_expression + |> CronScheduler.get_next_run_date!(naive_date_from) + |> DateTime.from_naive!("Etc/UTC") + end end @doc """ diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 7a848b11b..245866a4b 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -105,12 +105,23 @@ defmodule ArchEthic.SelfRepair.Sync do |> BeaconChain.next_summary_dates() |> Flow.from_enumerable() |> Flow.flat_map(&subsets_by_times/1) - |> Flow.partition(key: {:elem, 0}) - |> Flow.reduce( - fn -> %BeaconSummaryAggregate{} end, - &aggregate_summaries_by_date(&1, &2, authorized_nodes) - ) - |> Flow.emit(:state) + |> Flow.partition(key: {:elem, 0}, window: flow_window()) + |> Flow.reduce(fn -> [] end, fn {time, subset}, acc -> + summary = get_beacon_summary(time, subset, authorized_nodes) + + if BeaconSummary.empty?(summary) do + acc + else + [summary | acc] + end + end) + |> Flow.on_trigger(fn acc, _, {:fixed, time, :done} -> + agg = %BeaconSummaryAggregate{ + summary_time: DateTime.from_unix!(time, :millisecond) + } + + {[Enum.reduce(acc, agg, &BeaconSummaryAggregate.add_summary(&2, &1))], acc} + end) |> Stream.reject(&BeaconSummaryAggregate.empty?/1) |> Enum.sort_by(& &1.summary_time) |> Enum.each(&BeaconSummaryHandler.process_summary_aggregate(&1, patch)) @@ -123,15 +134,17 @@ defmodule ArchEthic.SelfRepair.Sync do Enum.map(subsets, fn subset -> {DateTime.truncate(time, :second), subset} end) end - # defp flow_window do - # Flow.Window.fixed(, :second, fn {date, _} -> - # DateTime.to_unix(date, :millisecond) - # end) - # end + defp flow_window do + Flow.Window.fixed(summary_interval(:second), :second, fn {date, _} -> + DateTime.to_unix(date, :millisecond) + end) + end - # defp dates_interval_seconds(last_sync_date) do - # DateTime.diff(last_sync_date, BeaconChain.next_summary_date(last_sync_date)) - # end + defp summary_interval(unit) do + next_summary = BeaconChain.next_summary_date(DateTime.utc_now()) + next_summary2 = BeaconChain.next_summary_date(next_summary) + DateTime.diff(next_summary2, next_summary, unit) + end defp get_beacon_summary(time, subset, node_list) do filter_nodes = Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt)) @@ -139,17 +152,4 @@ defmodule ArchEthic.SelfRepair.Sync do nodes = Election.beacon_storage_nodes(subset, time, filter_nodes) BeaconSummaryHandler.get_full_beacon_summary(time, subset, nodes) end - - defp aggregate_summaries_by_date({time, subset}, acc, authorized_nodes) do - summary = get_beacon_summary(time, subset, authorized_nodes) - - if BeaconSummary.empty?(summary) do - acc - else - acc - |> BeaconSummaryAggregate.initialize(summary) - |> BeaconSummaryAggregate.add_transaction_summaries(summary) - |> BeaconSummaryAggregate.add_p2p_availabilities(summary) - end - end end diff --git a/lib/archethic/self_repair/sync/beacon_aggregate.ex b/lib/archethic/self_repair/sync/beacon_aggregate.ex index 2e53e4984..f77526908 100644 --- a/lib/archethic/self_repair/sync/beacon_aggregate.ex +++ b/lib/archethic/self_repair/sync/beacon_aggregate.ex @@ -7,7 +7,6 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryAggregate do defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}] - alias ArchEthic.BeaconChain.ReplicationAttestation alias ArchEthic.BeaconChain.Summary, as: BeaconSummary alias ArchEthic.TransactionChain.TransactionSummary @@ -23,42 +22,32 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryAggregate do } @doc """ - Initialize the aggregate by defining the summary time for example - - If the aggregate is already initialized, the call will be skipped + Aggregate a new BeaconChain's summary """ - @spec initialize(t(), BeaconSummary.t()) :: t() - def initialize( - aggregate = %__MODULE__{summary_time: nil}, - %BeaconSummary{summary_time: summary_time} + @spec add_summary(t(), BeaconSummary.t()) :: t() + def add_summary( + agg = %__MODULE__{}, + %BeaconSummary{ + subset: subset, + transaction_attestations: attestations, + node_availabilities: node_availabilities, + node_average_availabilities: node_average_availabilities + } ) do - %{aggregate | summary_time: summary_time} - end + transaction_summaries = + attestations + |> Enum.map(& &1.transaction_summary) + |> Enum.concat(agg.transaction_summaries) + |> Enum.uniq_by(& &1.address) + |> Enum.sort_by(& &1.timestamp, {:asc, DateTime}) - def initialize( - aggregate = %__MODULE__{}, - _summary - ) do - aggregate - end + p2p_availabilities = + Map.put(agg.p2p_availabilities, subset, %{ + node_availabilities: node_availabilities, + node_average_availabilities: node_average_availabilities + }) - @doc """ - Add a transaction summaries to the aggregate by providing uniqueness and sorting - """ - @spec add_transaction_summaries(t(), BeaconSummary.t()) :: t() - def add_transaction_summaries( - aggregate = %__MODULE__{}, - %BeaconSummary{transaction_attestations: transaction_attestations} - ) do - transaction_attestations - |> Enum.reduce(aggregate, fn %ReplicationAttestation{transaction_summary: transaction_summary}, - acc -> - Map.update!(acc, :transaction_summaries, fn transaction_summaries -> - [transaction_summary | transaction_summaries] - |> Enum.uniq_by(& &1.address) - |> Enum.sort_by(& &1.timestamp, {:asc, DateTime}) - end) - end) + %{agg | transaction_summaries: transaction_summaries, p2p_availabilities: p2p_availabilities} end @doc """ diff --git a/lib/archethic/self_repair/sync/beacon_summary_handler.ex b/lib/archethic/self_repair/sync/beacon_summary_handler.ex index cecbc6115..9cb68fa9c 100644 --- a/lib/archethic/self_repair/sync/beacon_summary_handler.ex +++ b/lib/archethic/self_repair/sync/beacon_summary_handler.ex @@ -45,6 +45,7 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do ) |> Enum.filter(&match?({:ok, {:ok, %BeaconSummary{}}}, &1)) |> Enum.map(fn {:ok, {:ok, summary}} -> summary end) + |> Enum.reject(&BeaconSummary.empty?/1) |> Enum.reduce( %{ transaction_attestations: [], diff --git a/lib/mix/tasks/clean_db.ex b/lib/mix/tasks/clean_db.ex index 876b65971..f7d27ef4f 100644 --- a/lib/mix/tasks/clean_db.ex +++ b/lib/mix/tasks/clean_db.ex @@ -4,14 +4,12 @@ defmodule Mix.Tasks.ArchEthic.CleanDb do use Mix.Task def run(_arg) do - files_to_remove = - ["_build", "dev", "lib", "archethic", "data_*"] - |> Path.join() - |> Path.wildcard() - - IO.puts("#{files_to_remove} will be removed") - - Enum.each(files_to_remove, &File.rm_rf!/1) + "_build/dev/lib/archethic/data*" + |> Path.wildcard() + |> Enum.each(fn path -> + IO.puts("#{path} will be removed") + File.rm_rf!(path) + end) IO.puts("Database dropped") end diff --git a/test/archethic/beacon_chain/summary_timer_test.exs b/test/archethic/beacon_chain/summary_timer_test.exs index 6efbbe4de..147c5ce3f 100644 --- a/test/archethic/beacon_chain/summary_timer_test.exs +++ b/test/archethic/beacon_chain/summary_timer_test.exs @@ -4,11 +4,20 @@ defmodule ArchEthic.BeaconChain.SummaryTimerTest do alias ArchEthic.BeaconChain.SummaryTimer - test "next_summary/2 should get the next summary time from a given date" do - {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], []) - now = ~U[2021-01-02 03:00:19Z] - next_summary_time = SummaryTimer.next_summary(now) - assert 1 == abs(now.minute - next_summary_time.minute) + describe "next_summary/2" do + test "should get the next summary time from a given date" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], []) + now = ~U[2021-01-02 03:00:19.501Z] + next_summary_time = SummaryTimer.next_summary(now) + assert 1 == abs(now.minute - next_summary_time.minute) + end + + test "should get the 2nd next summary time when the date is an summary interval date" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], []) + next_date = SummaryTimer.next_summary(DateTime.utc_now()) + next_summary_time = SummaryTimer.next_summary(next_date) + assert DateTime.compare(next_summary_time, next_date) == :gt + end end property "previous_summaries/1 should retrieve the previous summary times from a date" do