Skip to content

Commit

Permalink
feat(P2PView): ✨ Store P2PView historic data.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chralu committed Feb 11, 2025
1 parent 311ea40 commit b99da22
Show file tree
Hide file tree
Showing 3 changed files with 551 additions and 0 deletions.
328 changes: 328 additions & 0 deletions lib/archethic/db/embedded_impl/p2p_view2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do

Check warning on line 1 in lib/archethic/db/embedded_impl/p2p_view2.ex

View workflow job for this annotation

GitHub Actions / Build and test

Modules should have a @moduledoc tag.
defstruct [
:geo_patch,
:available?,
:avg_availability
]

@type t :: %{
geo_patch: binary(),
available?: boolean(),
avg_availability: float()
}

@archethic_db_p2pview :archethic_db_p2pview

require Logger
use GenServer

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_) do
setup_ets_table()
{:ok, %{}}
end

defp setup_ets_table, do: :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table])

@spec set_summary(timestamp :: DateTime.t(), p2p_view :: list(t()), are_new_nodes? :: boolean()) ::
:ok
def set_summary(timestamp, p2p_view, are_new_nodes?) when is_list(p2p_view) do
unix_timestamp = DateTime.to_unix(timestamp)

bin_p2p_view = serialize(p2p_view, are_new_nodes?)

GenServer.call(__MODULE__, {:set_summary, unix_timestamp, bin_p2p_view})
end

@spec get_summary(timestamp :: DateTime.t()) :: list(t())
def get_summary(timestamp) do
DateTime.to_unix(timestamp)
|> read_nodes()
|> deserialize()
end

@spec update_node(
changes :: [],
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def update_node(changes, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)

GenServer.call(
__MODULE__,
{:update_node, changes, unix_start_timestamp, index_at_timestamp}
)
end

@spec add_node(
node :: t(),
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def add_node(node, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)
node_bin = serialize_node(node, true)
GenServer.call(__MODULE__, {:add_node, node_bin, unix_start_timestamp, index_at_timestamp})
end

# GenServer Callbacks
def handle_call({:set_summary, unix_timestamp, serialized_nodes}, _from, state) do
write_nodes(serialized_nodes, unix_timestamp)
{:reply, :ok, state}
end

def handle_call({:update_node, changes, unix_start_timestamp, index_at_timestamp}, _from, state) do
do_update_node(changes, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

def handle_call({:add_node, node_bin, unix_start_timestamp, index_at_timestamp}, _from, state) do
do_add_node(node_bin, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

@spec do_update_node(
changes :: [],
unix_timestamp :: integer() | atom(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
defp do_update_node(_, :"$end_of_table", _), do: :ok
defp do_update_node([], _, _), do: :ok

defp do_update_node(changes, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

bin_p2p_view = read_nodes(unix_timestamp)

bin_node = get_bin_node(bin_p2p_view, node_index)

changes_to_apply =
changes
|> Enum.filter(&should_apply_change?(&1, bin_node))

bin_p2p_view
|> update_bin_node(node_index, changes_to_apply)
|> write_nodes(unix_timestamp)

do_update_node(
changes_to_apply,
:ets.next(@archethic_db_p2pview, unix_timestamp),
index_at_timestamp
)
end

@spec should_apply_change?(data :: {key :: atom(), _ :: any()}, bin_node :: binary()) ::
boolean()
defp should_apply_change?({key, _}, bin_node) do
<<changed?::8, _::binary>> = get_bin_node_property(bin_node, key)
changed? != 1
end

@spec do_add_node(
node_bin :: binary(),
unix_timestamp :: integer() | atom(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
defp do_add_node(_, :"$end_of_table", _), do: :ok

defp do_add_node(node_bin, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

read_nodes(unix_timestamp)
|> insert_bin_node(node_index, node_bin)
|> write_nodes(unix_timestamp)

do_add_node(
node_bin,
:ets.next(@archethic_db_p2pview, unix_timestamp),
index_at_timestamp
)
end

# TODO decliner avec enregistrement sur fichier
@spec read_nodes(unix_timestamp :: integer()) :: binary()
defp read_nodes(unix_timestamp) do
index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1)

{_, data} =
:ets.lookup(@archethic_db_p2pview, index)
|> Enum.at(0)

data
end

# TODO decliner avec enregistrement sur fichier
@spec write_nodes(nodes :: binary(), unix_timestamp :: integer()) :: :ok
defp write_nodes(nodes, unix_timestamp) do
:ets.insert(
@archethic_db_p2pview,
{unix_timestamp, nodes}
)

:ok
end

@bin_node_byte_size 8

@spec serialize(p2p_view :: list(t()), are_new_nodes? :: boolean(), acc :: binary()) ::
binary()
defp serialize(p2p_view, are_new_nodes?, acc \\ <<>>)

defp serialize([], _, acc), do: acc

defp serialize([node | rest], are_new_nodes?, acc) do
node_bin = serialize_node(node, are_new_nodes?)

serialize(
rest,
are_new_nodes?,
acc <> node_bin
)
end

defp serialize_node(
%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
},
is_new_node?
) do
[{:geo_patch, geo_patch}, {:available?, available?}, {:avg_availability, avg_availability}]
|> Enum.reduce(
<<>>,
&(&2 <> serialize_node_property_changed(is_new_node?) <> serialize_node_property(&1))
)
end

defp serialize_node_property({:geo_patch, value}), do: <<value::binary-size(3)>>
defp serialize_node_property({:available?, value}), do: serialize_boolean(value)
defp serialize_node_property({:avg_availability, value}), do: <<trunc(value * 100)::8>>
defp serialize_node_property_changed(value), do: serialize_boolean(value)

defp serialize_boolean(true), do: <<1::8>>
defp serialize_boolean(false), do: <<0::8>>

@spec deserialize(rest :: binary(), acc :: list(t())) :: list(t())
defp deserialize(rest, acc \\ [])

defp deserialize(<<>>, acc) do
acc |> Enum.reverse()
end

defp deserialize(
<<node_bin::binary-size(@bin_node_byte_size), rest::binary>>,
acc
) do
node = deserialize_node(node_bin)

deserialize(rest, [node | acc])
end

@spec deserialize_node(bin_node :: binary()) :: t()
defp deserialize_node(
<<_::8, geo_patch::binary-size(3), _::8, available?, _::8, avg_availability::8>>
) do
%__MODULE__{
geo_patch: geo_patch,
available?: available? == 1,
avg_availability: avg_availability / 100
}
end

# Helper functions for bin_node manipulation in binary form

@spec get_bin_node(bin_p2p_view :: binary(), index :: integer()) :: binary()
defp get_bin_node(
bin_p2p_view,
index
) do
:erlang.binary_part(bin_p2p_view, @bin_node_byte_size * index, @bin_node_byte_size)
end

@spec get_bin_node_property(bin_node :: binary(), property :: atom()) :: binary()
defp get_bin_node_property(
<<geo_patch_changed?::8, geo_patch::binary-size(3), _::binary>>,
:geo_patch
) do
<<geo_patch_changed?::8, geo_patch::binary-size(3)>>
end

defp get_bin_node_property(
<<_::32, available_changed?::8, available?::8, _::binary>>,
:available
) do
<<available_changed?::8, available?::8>>
end

defp get_bin_node_property(
<<_::48, avg_availability_changed?::8, avg_availability::8>>,
:avg_availability
) do
<<avg_availability_changed?::8, avg_availability::8>>
end

@spec insert_bin_node(bin_p2p_view :: binary(), index :: integer(), bin_node :: binary()) ::
binary()
defp insert_bin_node(
bin_p2p_view,
index,
bin_node
) do
prefix_size = @bin_node_byte_size * index

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
bin_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
:erlang.byte_size(bin_p2p_view) - prefix_size
)
end

@spec update_bin_node(bin_p2p_view :: binary(), index :: integer(), changes :: []) ::
binary()
defp update_bin_node(
bin_p2p_view,
index,
changes
) do
updated_node =
get_bin_node(bin_p2p_view, index)
|> apply_changes_to_node(changes)

prefix_size = @bin_node_byte_size * index
suffix_size = :erlang.byte_size(bin_p2p_view) - @bin_node_byte_size * (index + 1)

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
updated_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
suffix_size
)
end

@spec apply_changes_to_node(bin_node :: binary(), changes :: []) :: binary()
defp apply_changes_to_node(
bin_node,
changes
) do
[:geo_patch, :available, :avg_availability]
|> Enum.reduce(<<>>, fn key, acc ->
acc <>
case changes[key] do
nil ->
get_bin_node_property(bin_node, key)

value ->
serialize_node_property_changed(true) <> serialize_node_property({key, value})
end
end)
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do
@doc """
List the P2P nodes
"""
# TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date
@spec list_nodes() :: list(Node.t())
def list_nodes do
:ets.foldl(
Expand Down
Loading

0 comments on commit b99da22

Please sign in to comment.