Skip to content

Commit

Permalink
feat(P2PView): ✨ Store P2PView historic data.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chralu committed Feb 10, 2025
1 parent 311ea40 commit 0a76650
Show file tree
Hide file tree
Showing 3 changed files with 477 additions and 0 deletions.
256 changes: 256 additions & 0 deletions lib/archethic/db/embedded_impl/p2p_view2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do
defstruct [
:geo_patch,
:available?,
:avg_availability
]

@type t :: %{
geo_patch: binary(),
available?: boolean(),
avg_availability: float()
}

@archethic_db_p2pview :archethic_db_p2pview

require Logger
use GenServer

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_) do
setup_ets_table()
{:ok, %{}}
end

defp setup_ets_table, do: :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table])

@spec set_summary(timestamp :: DateTime.t(), p2p_view :: list(t())) :: :ok
def set_summary(timestamp, p2p_view) when is_list(p2p_view) do
unix_timestamp = DateTime.to_unix(timestamp)
bin_p2p_view = serialize(p2p_view)

GenServer.call(__MODULE__, {:set_summary, unix_timestamp, bin_p2p_view})
end

@spec get_summary(timestamp :: DateTime.t()) :: list(t())
def get_summary(timestamp) do
DateTime.to_unix(timestamp)
|> read_nodes()
|> deserialize()
end

@spec update_node(
changes :: [],
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def update_node(changes, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)
GenServer.call(__MODULE__, {:update_node, changes, unix_start_timestamp, index_at_timestamp})
end

@spec add_node(
bin_node :: t(),
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) ::
:ok
def add_node(bin_node, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)
node_bin = serialize_node(bin_node)
GenServer.call(__MODULE__, {:add_node, node_bin, unix_start_timestamp, index_at_timestamp})
end

# GenServer Callbacks
def handle_call({:set_summary, unix_timestamp, serialized_nodes}, _from, state) do
write_nodes(serialized_nodes, unix_timestamp)
{:reply, :ok, state}
end

def handle_call({:update_node, changes, unix_start_timestamp, index_at_timestamp}, _from, state) do
do_update_node(changes, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

defp do_update_node(_, :"$end_of_table", _) do
end

defp do_update_node(changes, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

read_nodes(unix_timestamp)
|> update_bin_node(node_index, changes)
|> write_nodes(unix_timestamp)

do_update_node(changes, :ets.next(@archethic_db_p2pview, unix_timestamp), index_at_timestamp)
end

def handle_call({:add_node, node_bin, unix_start_timestamp, index_at_timestamp}, _from, state) do

Check warning on line 92 in lib/archethic/db/embedded_impl/p2p_view2.ex

View workflow job for this annotation

GitHub Actions / Build and test

clauses with the same name and arity (number of arguments) should be grouped together, "def handle_call/3" was previously defined (lib/archethic/db/embedded_impl/p2p_view2.ex:69)
do_add_node(node_bin, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

defp do_add_node(_, :"$end_of_table", _) do
end

defp do_add_node(node_bin, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

read_nodes(unix_timestamp)
|> insert_bin_node(node_index, node_bin)
|> write_nodes(unix_timestamp)

do_add_node(
node_bin,
:ets.next(@archethic_db_p2pview, unix_timestamp),
index_at_timestamp
)
end

# TODO decliner avec enregistrement sur fichier
@spec read_nodes(unix_timestamp :: integer()) :: binary()
defp read_nodes(unix_timestamp) do
index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1)

{_, data} =
:ets.lookup(@archethic_db_p2pview, index)
|> Enum.at(0)

data
end

# TODO decliner avec enregistrement sur fichier
@spec write_nodes(nodes :: binary(), unix_timestamp :: integer()) :: :ok
defp write_nodes(nodes, unix_timestamp) do
:ets.insert(
@archethic_db_p2pview,
{unix_timestamp, nodes}
)

:ok
end

@spec serialize(nodes :: list(t()), acc :: binary()) :: binary()
defp serialize(nodes, acc \\ <<>>)

defp serialize([], acc) do
acc
end

defp serialize([bin_node | rest], acc) do
node_bin = serialize_node(bin_node)

serialize(
rest,
acc <> node_bin
)
end

defp serialize_node(%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}) do
available_bin = if available?, do: 1, else: 0
avg_availability_int = round(avg_availability * 100)

<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>>
end

@spec deserialize(rest :: binary(), acc :: list(t())) :: list(t())
defp deserialize(rest, acc \\ [])

defp deserialize(<<>>, acc) do
acc |> Enum.reverse()
end

defp deserialize(
<<geo_patch::binary-size(3), available_bin, avg_availability_int::8, rest::binary>>,
acc
) do
node = deserialize_node(<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>>)

deserialize(rest, [
node
| acc
])
end

@spec deserialize_node(bin_node :: binary()) :: t()
defp deserialize_node(<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>>) do
available? = available_bin == 1
avg_availability = avg_availability_int / 100

%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
end

# Helper functions for bin_node manipulation in binary form
@bin_node_byte_size 5

@spec get_bin_node(bin_p2p_view :: binary(), index :: integer()) :: binary()
defp get_bin_node(
bin_p2p_view,
index
) do
:erlang.binary_part(bin_p2p_view, @bin_node_byte_size * index, @bin_node_byte_size)
end

@spec insert_bin_node(bin_p2p_view :: binary(), index :: integer(), bin_node :: binary()) ::
binary()
defp insert_bin_node(
bin_p2p_view,
index,
bin_node
) do
prefix_size = @bin_node_byte_size * index

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
bin_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
:erlang.byte_size(bin_p2p_view) - prefix_size
)
end

@spec update_bin_node(bin_p2p_view :: binary(), index :: integer(), changes :: []) ::
binary()
defp update_bin_node(
bin_p2p_view,
index,
changes
) do
updated_node =
get_bin_node(bin_p2p_view, index)
|> deserialize_node()
|> apply_changes_to_node(changes)
|> serialize_node()

prefix_size = @bin_node_byte_size * index
suffix_size = :erlang.byte_size(bin_p2p_view) - @bin_node_byte_size * (index + 1)

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
updated_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
suffix_size
)
end

@spec apply_changes_to_node(bin_node :: t(), changes :: []) :: t()
defp apply_changes_to_node(bin_node, changes) do
Map.merge(
bin_node,
Map.new(changes)
)
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do
@doc """
List the P2P nodes
"""
# TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date
@spec list_nodes() :: list(Node.t())
def list_nodes do
:ets.foldl(
Expand Down
Loading

0 comments on commit 0a76650

Please sign in to comment.