From b99da223b8856104ad3245d4b19543982dbe9eff Mon Sep 17 00:00:00 2001 From: Chralu Date: Thu, 6 Feb 2025 12:27:33 +0100 Subject: [PATCH] feat(P2PView): :sparkles: Store P2PView historic data. --- lib/archethic/db/embedded_impl/p2p_view2.ex | 328 ++++++++++++++++++ lib/archethic/p2p/mem_table.ex | 1 + .../db/embedded_impl/p2p_view_test.exs | 222 ++++++++++++ 3 files changed, 551 insertions(+) create mode 100644 lib/archethic/db/embedded_impl/p2p_view2.ex create mode 100644 test/archethic/db/embedded_impl/p2p_view_test.exs diff --git a/lib/archethic/db/embedded_impl/p2p_view2.ex b/lib/archethic/db/embedded_impl/p2p_view2.ex new file mode 100644 index 000000000..bccae6094 --- /dev/null +++ b/lib/archethic/db/embedded_impl/p2p_view2.ex @@ -0,0 +1,328 @@ +defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do + defstruct [ + :geo_patch, + :available?, + :avg_availability + ] + + @type t :: %{ + geo_patch: binary(), + available?: boolean(), + avg_availability: float() + } + + @archethic_db_p2pview :archethic_db_p2pview + + require Logger + use GenServer + + @spec start_link(Keyword.t()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_) do + setup_ets_table() + {:ok, %{}} + end + + defp setup_ets_table, do: :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table]) + + @spec set_summary(timestamp :: DateTime.t(), p2p_view :: list(t()), are_new_nodes? :: boolean()) :: + :ok + def set_summary(timestamp, p2p_view, are_new_nodes?) when is_list(p2p_view) do + unix_timestamp = DateTime.to_unix(timestamp) + + bin_p2p_view = serialize(p2p_view, are_new_nodes?) + + GenServer.call(__MODULE__, {:set_summary, unix_timestamp, bin_p2p_view}) + end + + @spec get_summary(timestamp :: DateTime.t()) :: list(t()) + def get_summary(timestamp) do + DateTime.to_unix(timestamp) + |> read_nodes() + |> deserialize() + end + + @spec update_node( + changes :: [], + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + def update_node(changes, start_timestamp, index_at_timestamp) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + + GenServer.call( + __MODULE__, + {:update_node, changes, unix_start_timestamp, index_at_timestamp} + ) + end + + @spec add_node( + node :: t(), + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + def add_node(node, start_timestamp, index_at_timestamp) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + node_bin = serialize_node(node, true) + GenServer.call(__MODULE__, {:add_node, node_bin, unix_start_timestamp, index_at_timestamp}) + end + + # GenServer Callbacks + def handle_call({:set_summary, unix_timestamp, serialized_nodes}, _from, state) do + write_nodes(serialized_nodes, unix_timestamp) + {:reply, :ok, state} + end + + def handle_call({:update_node, changes, unix_start_timestamp, index_at_timestamp}, _from, state) do + do_update_node(changes, unix_start_timestamp, index_at_timestamp) + {:reply, :ok, state} + end + + def handle_call({:add_node, node_bin, unix_start_timestamp, index_at_timestamp}, _from, state) do + do_add_node(node_bin, unix_start_timestamp, index_at_timestamp) + {:reply, :ok, state} + end + + @spec do_update_node( + changes :: [], + unix_timestamp :: integer() | atom(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + defp do_update_node(_, :"$end_of_table", _), do: :ok + defp do_update_node([], _, _), do: :ok + + defp do_update_node(changes, unix_timestamp, index_at_timestamp) do + node_index = index_at_timestamp.(unix_timestamp) + + bin_p2p_view = read_nodes(unix_timestamp) + + bin_node = get_bin_node(bin_p2p_view, node_index) + + changes_to_apply = + changes + |> Enum.filter(&should_apply_change?(&1, bin_node)) + + bin_p2p_view + |> update_bin_node(node_index, changes_to_apply) + |> write_nodes(unix_timestamp) + + do_update_node( + changes_to_apply, + :ets.next(@archethic_db_p2pview, unix_timestamp), + index_at_timestamp + ) + end + + @spec should_apply_change?(data :: {key :: atom(), _ :: any()}, bin_node :: binary()) :: + boolean() + defp should_apply_change?({key, _}, bin_node) do + <> = get_bin_node_property(bin_node, key) + changed? != 1 + end + + @spec do_add_node( + node_bin :: binary(), + unix_timestamp :: integer() | atom(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + defp do_add_node(_, :"$end_of_table", _), do: :ok + + defp do_add_node(node_bin, unix_timestamp, index_at_timestamp) do + node_index = index_at_timestamp.(unix_timestamp) + + read_nodes(unix_timestamp) + |> insert_bin_node(node_index, node_bin) + |> write_nodes(unix_timestamp) + + do_add_node( + node_bin, + :ets.next(@archethic_db_p2pview, unix_timestamp), + index_at_timestamp + ) + end + + # TODO decliner avec enregistrement sur fichier + @spec read_nodes(unix_timestamp :: integer()) :: binary() + defp read_nodes(unix_timestamp) do + index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1) + + {_, data} = + :ets.lookup(@archethic_db_p2pview, index) + |> Enum.at(0) + + data + end + + # TODO decliner avec enregistrement sur fichier + @spec write_nodes(nodes :: binary(), unix_timestamp :: integer()) :: :ok + defp write_nodes(nodes, unix_timestamp) do + :ets.insert( + @archethic_db_p2pview, + {unix_timestamp, nodes} + ) + + :ok + end + + @bin_node_byte_size 8 + + @spec serialize(p2p_view :: list(t()), are_new_nodes? :: boolean(), acc :: binary()) :: + binary() + defp serialize(p2p_view, are_new_nodes?, acc \\ <<>>) + + defp serialize([], _, acc), do: acc + + defp serialize([node | rest], are_new_nodes?, acc) do + node_bin = serialize_node(node, are_new_nodes?) + + serialize( + rest, + are_new_nodes?, + acc <> node_bin + ) + end + + defp serialize_node( + %__MODULE__{ + geo_patch: geo_patch, + available?: available?, + avg_availability: avg_availability + }, + is_new_node? + ) do + [{:geo_patch, geo_patch}, {:available?, available?}, {:avg_availability, avg_availability}] + |> Enum.reduce( + <<>>, + &(&2 <> serialize_node_property_changed(is_new_node?) <> serialize_node_property(&1)) + ) + end + + defp serialize_node_property({:geo_patch, value}), do: <> + defp serialize_node_property({:available?, value}), do: serialize_boolean(value) + defp serialize_node_property({:avg_availability, value}), do: <> + defp serialize_node_property_changed(value), do: serialize_boolean(value) + + defp serialize_boolean(true), do: <<1::8>> + defp serialize_boolean(false), do: <<0::8>> + + @spec deserialize(rest :: binary(), acc :: list(t())) :: list(t()) + defp deserialize(rest, acc \\ []) + + defp deserialize(<<>>, acc) do + acc |> Enum.reverse() + end + + defp deserialize( + <>, + acc + ) do + node = deserialize_node(node_bin) + + deserialize(rest, [node | acc]) + end + + @spec deserialize_node(bin_node :: binary()) :: t() + defp deserialize_node( + <<_::8, geo_patch::binary-size(3), _::8, available?, _::8, avg_availability::8>> + ) do + %__MODULE__{ + geo_patch: geo_patch, + available?: available? == 1, + avg_availability: avg_availability / 100 + } + end + + # Helper functions for bin_node manipulation in binary form + + @spec get_bin_node(bin_p2p_view :: binary(), index :: integer()) :: binary() + defp get_bin_node( + bin_p2p_view, + index + ) do + :erlang.binary_part(bin_p2p_view, @bin_node_byte_size * index, @bin_node_byte_size) + end + + @spec get_bin_node_property(bin_node :: binary(), property :: atom()) :: binary() + defp get_bin_node_property( + <>, + :geo_patch + ) do + <> + end + + defp get_bin_node_property( + <<_::32, available_changed?::8, available?::8, _::binary>>, + :available + ) do + <> + end + + defp get_bin_node_property( + <<_::48, avg_availability_changed?::8, avg_availability::8>>, + :avg_availability + ) do + <> + end + + @spec insert_bin_node(bin_p2p_view :: binary(), index :: integer(), bin_node :: binary()) :: + binary() + defp insert_bin_node( + bin_p2p_view, + index, + bin_node + ) do + prefix_size = @bin_node_byte_size * index + + :erlang.binary_part(bin_p2p_view, 0, prefix_size) <> + bin_node <> + :erlang.binary_part( + bin_p2p_view, + prefix_size, + :erlang.byte_size(bin_p2p_view) - prefix_size + ) + end + + @spec update_bin_node(bin_p2p_view :: binary(), index :: integer(), changes :: []) :: + binary() + defp update_bin_node( + bin_p2p_view, + index, + changes + ) do + updated_node = + get_bin_node(bin_p2p_view, index) + |> apply_changes_to_node(changes) + + prefix_size = @bin_node_byte_size * index + suffix_size = :erlang.byte_size(bin_p2p_view) - @bin_node_byte_size * (index + 1) + + :erlang.binary_part(bin_p2p_view, 0, prefix_size) <> + updated_node <> + :erlang.binary_part( + bin_p2p_view, + prefix_size, + suffix_size + ) + end + + @spec apply_changes_to_node(bin_node :: binary(), changes :: []) :: binary() + defp apply_changes_to_node( + bin_node, + changes + ) do + [:geo_patch, :available, :avg_availability] + |> Enum.reduce(<<>>, fn key, acc -> + acc <> + case changes[key] do + nil -> + get_bin_node_property(bin_node, key) + + value -> + serialize_node_property_changed(true) <> serialize_node_property({key, value}) + end + end) + end +end diff --git a/lib/archethic/p2p/mem_table.ex b/lib/archethic/p2p/mem_table.ex index 4b61706b7..cea91d3ec 100644 --- a/lib/archethic/p2p/mem_table.ex +++ b/lib/archethic/p2p/mem_table.ex @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do @doc """ List the P2P nodes """ + # TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date @spec list_nodes() :: list(Node.t()) def list_nodes do :ets.foldl( diff --git a/test/archethic/db/embedded_impl/p2p_view_test.exs b/test/archethic/db/embedded_impl/p2p_view_test.exs new file mode 100644 index 000000000..21a887aac --- /dev/null +++ b/test/archethic/db/embedded_impl/p2p_view_test.exs @@ -0,0 +1,222 @@ +defmodule Archethic.DB.EmbeddedImpl.P2PViewTest do + use ArchethicCase + + alias Archethic.DB.EmbeddedImpl.P2PViewTwo + + # import ArchethicCase + + # Doit stocker sur disque les années précédentes + + setup do + {:ok, _pid} = P2PViewTwo.start_link() + :ok + end + + @spec init_node(id :: non_neg_integer()) :: P2PViewTwo.t() + defp init_node(id) do + %P2PViewTwo{ + geo_patch: "AAA", + available?: true, + avg_availability: id / 10 + } + end + + defp given_nodes(summaries) do + summaries + |> Enum.with_index() + |> Enum.each(fn + {nil, _} -> + nil + + {nodes, i} -> + P2PViewTwo.set_summary( + DateTime.from_unix!(i), + nodes, + false + ) + end) + end + + @doc """ + Gets P2PView nodes from unix timestamp. + """ + def get_summary(timestamp) do + P2PViewTwo.get_summary(DateTime.from_unix!(timestamp)) + end + + describe "set_summary/1" do + test "should create summary for a timestamp" do + nodes_view = + 1..10 + |> Enum.map(fn i -> + init_node(i) + end) + + P2PViewTwo.set_summary( + DateTime.truncate(DateTime.utc_now(), :second), + nodes_view, + true + ) + + assert nodes_view == P2PViewTwo.get_summary(DateTime.utc_now()) + end + end + + describe "add_node/3" do + test "should update first record and the following ones" do + node1 = init_node(1) + node2 = init_node(2) + node3 = init_node(3) + + # Given + # - one node on timestamps 0, 1 + # - two nodes on timestamps 3, 4 + given_nodes([ + [node1], + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When adding a new node from timestamp 1 + assert :ok == + P2PViewTwo.add_node( + node3, + DateTime.from_unix!(1), + fn + 1 -> 1 + _ -> 2 + end + ) + + assert [node1] == get_summary(0) + assert [node1, node3] == get_summary(1) + assert [node1, node2, node3] == get_summary(3) + assert [node1, node2, node3] == get_summary(4) + end + + test "should create first record and update the following ones" do + node1 = init_node(1) + node2 = init_node(2) + node3 = init_node(3) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 2, 3 + given_nodes([ + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When adding a new node from timestamp 1 + assert :ok == + P2PViewTwo.add_node( + node3, + DateTime.from_unix!(1), + fn + 1 -> 0 + _ -> 1 + end + ) + + assert [node1] == get_summary(0) + assert [node3, node1] == get_summary(1) + assert [node1, node3, node2] == get_summary(2) + assert [node1, node3, node2] == get_summary(3) + end + end + + describe "update_node/2" do + test "should update first record and the following ones" do + node1 = init_node(1) + node2 = init_node(2) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 1, 3, 5 + given_nodes([ + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When updating node 2 from timestamp 3 + assert :ok == + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(3), + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == get_summary(0) + assert [node1, updated_node_2] == get_summary(3) + assert [node1, updated_node_2] == get_summary(5) + end + end + + test "should create first record and update the following ones" do + node1 = init_node(1) + node2 = init_node(2) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 1, 3 + given_nodes([ + [node1], + [node1, node2], + nil, + [node1, node2] + ]) + + # When updating node 2 from timestamp 2 + assert :ok == + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(2), + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == get_summary(0) + assert [node1, node2] == get_summary(1) + assert [node1, updated_node_2] == get_summary(2) + assert [node1, updated_node_2] == get_summary(3) + end + + test "should update nodes properties until another mutation is met" do + node1 = init_node(1) + + # Given node1 on timestamps 0, 2, 3 + given_nodes([ + [node1], + [node1], + [node1] + ]) + + # Given a mutation on timestamp 1 + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(1), + fn _ -> 0 end + ) + + # When updating node 1 from timestamp 0 + assert :ok == + P2PViewTwo.update_node( + [geo_patch: "BBB", avg_availability: 0.3], + DateTime.from_unix!(0), + fn _ -> 0 end + ) + + assert [%{node1 | avg_availability: 0.3, geo_patch: "BBB"}] == get_summary(0) + assert [%{node1 | avg_availability: 0.5, geo_patch: "BBB"}] == get_summary(1) + assert [%{node1 | avg_availability: 0.5, geo_patch: "BBB"}] == get_summary(2) + end +end