Skip to content

Commit

Permalink
Handle geopatch updates in Notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Wassim Mansouri committed Dec 5, 2024
1 parent 5c8b269 commit f1c4300
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 140 deletions.
37 changes: 30 additions & 7 deletions lib/archethic/p2p/mem_table_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.P2P.MemTableLoader do

alias Archethic.DB

alias Archethic.P2P
alias Archethic.P2P.GeoPatch
alias Archethic.P2P.MemTable
alias Archethic.P2P.Node
Expand Down Expand Up @@ -85,7 +86,7 @@ defmodule Archethic.P2P.MemTableLoader do
end

@doc """
Load the transaction and update the P2P view
Load the transaction and update the P2P view.
"""
@spec load_transaction(Transaction.t()) :: :ok
def load_transaction(%Transaction{
Expand Down Expand Up @@ -125,13 +126,20 @@ defmodule Archethic.P2P.MemTableLoader do
mining_public_key: mining_public_key
}

node
|> Node.enroll(timestamp)
|> MemTable.add_node()
node = Node.enroll(node, timestamp)
MemTable.add_node(node)
else
{:ok, node} = MemTable.get_node(first_public_key)

MemTable.add_node(%{
if node.geo_patch != geo_patch do
Logger.info("GeoPatch changed for node",
node: Base.encode16(first_public_key)
)

start_notifier_for_geopatch_change(timestamp)
end

updated_node = %Node{
node
| ip: ip,
port: port,
Expand All @@ -144,10 +152,12 @@ defmodule Archethic.P2P.MemTableLoader do
origin_public_key: origin_public_key,
last_update_date: timestamp,
mining_public_key: mining_public_key
})
}

MemTable.add_node(updated_node)
end

Logger.info("Node loaded into in memory p2p tables", node: Base.encode16(first_public_key))
Logger.info("Node loaded into in-memory P2P tables", node: Base.encode16(first_public_key))
end

def load_transaction(%Transaction{
Expand Down Expand Up @@ -193,4 +203,17 @@ defmodule Archethic.P2P.MemTableLoader do
MemTable.set_node_unavailable(node_public_key, availability_update)
end
end

defp start_notifier_for_geopatch_change(timestamp) do
Logger.info("Starting Notifier for GeoPatch change")

prev_available_nodes = P2P.authorized_and_available_nodes(timestamp, true)
new_available_nodes = P2P.authorized_and_available_nodes()

Archethic.SelfRepair.Notifier.start_link(%{
availability_update: timestamp,
prev_available_nodes: prev_available_nodes,
new_available_nodes: new_available_nodes
})
end
end
152 changes: 96 additions & 56 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,12 @@ defmodule Archethic.SelfRepair.Notifier do
end

def handle_info(:start, data) do
unavailable_nodes = Keyword.fetch!(data, :unavailable_nodes)
prev_available_nodes = Keyword.fetch!(data, :prev_available_nodes)
new_available_nodes = Keyword.fetch!(data, :new_available_nodes)

Logger.info(
"Start Notifier due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}"
)
Logger.info("Start Notifier due to a topology change")

repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes)
repair_transactions(prev_available_nodes, new_available_nodes)
repair_summaries_aggregate(prev_available_nodes, new_available_nodes)

{:stop, :normal, data}
Expand All @@ -77,16 +74,15 @@ defmodule Archethic.SelfRepair.Notifier do
For each txn chain in db. Load its genesis address, load its
chain, recompute shards , notifiy nodes. Network txns are excluded.
"""
@spec repair_transactions(list(Crypto.key()), list(Node.t()), list(Node.t())) :: :ok
def repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) do
@spec repair_transactions(list(Node.t()), list(Node.t())) :: :ok
def repair_transactions(prev_available_nodes, new_available_nodes) do
# We fetch all the transactions existing and check if the disconnected nodes were in storage nodes
TransactionChain.list_first_addresses()
|> Stream.reject(&network_chain?(&1))
|> Stream.chunk_every(20)
|> Stream.each(fn chunk ->
concurrent_txn_processing(
chunk,
unavailable_nodes,
prev_available_nodes,
new_available_nodes
)
Expand All @@ -103,21 +99,20 @@ defmodule Archethic.SelfRepair.Notifier do

defp concurrent_txn_processing(
addresses,
unavailable_nodes,
prev_available_nodes,
new_available_nodes
) do
Task.Supervisor.async_stream_nolink(
Archethic.task_supervisors(),
addresses,
&sync_chain(&1, unavailable_nodes, prev_available_nodes, new_available_nodes),
&sync_chain(&1, prev_available_nodes, new_available_nodes),
ordered: false,
on_timeout: :kill_task
)
|> Stream.run()
end

defp sync_chain(address, unavailable_nodes, prev_available_nodes, new_available_nodes) do
defp sync_chain(address, prev_available_nodes, new_available_nodes) do
genesis_address = TransactionChain.get_genesis_address(address)

address
Expand All @@ -126,13 +121,45 @@ defmodule Archethic.SelfRepair.Notifier do
validation_stamp: [ledger_operations: [:transaction_movements]]
])
|> Stream.map(&get_previous_election(&1, prev_available_nodes, genesis_address))
|> Stream.filter(&storage_or_io_node?(&1, unavailable_nodes))
|> Stream.map(&compute_elections(&1, new_available_nodes))
|> Stream.filter(&election_changed?(&1))
|> Stream.filter(&notify?(&1))
|> Stream.map(&new_storage_nodes(&1, new_available_nodes))
|> Stream.map(&new_storage_nodes(&1))
|> map_last_addresses_for_node()
|> notify_nodes(genesis_address)
end

defp compute_elections(
{address, resolved_addresses, prev_storage_nodes, prev_io_nodes},
new_available_nodes
) do
new_storage_nodes =
Election.chain_storage_nodes(address, new_available_nodes)
|> Enum.map(& &1.first_public_key)

new_io_nodes =
resolved_addresses
|> Election.io_storage_nodes(new_available_nodes)
|> Enum.map(& &1.first_public_key)

%{
address: address,
prev_storage_nodes: prev_storage_nodes,
prev_io_nodes: prev_io_nodes,
new_storage_nodes: new_storage_nodes,
new_io_nodes: new_io_nodes
}
end

defp election_changed?(%{
prev_storage_nodes: prev_storage_nodes,
prev_io_nodes: prev_io_nodes,
new_storage_nodes: new_storage_nodes,
new_io_nodes: new_io_nodes
}) do
prev_storage_nodes != new_storage_nodes or prev_io_nodes != new_io_nodes
end

defp get_previous_election(
%Transaction{
address: address,
Expand All @@ -153,36 +180,16 @@ defmodule Archethic.SelfRepair.Notifier do
movements_addresses =
transaction_movements
|> Enum.map(& &1.to)
|> Enum.concat(recipients)

authorized_nodes = P2P.authorized_and_available_nodes()

# Before AEIP-21, resolve movements included only last addresses,
# then we have to resolve the genesis address for all the movements
resolved_addresses =
if protocol_version <= 7 do
Task.async_stream(
movements_addresses,
fn address ->
storage_nodes = Election.chain_storage_nodes(address, authorized_nodes)

{:ok, resolved_genesis_address} =
TransactionChain.fetch_genesis_address(
address,
storage_nodes
)

[address, resolved_genesis_address]
end,
on_timeout: :kill_task,
max_concurrency: max(System.schedulers_online(), length(movements_addresses))
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, addresses} -> addresses end)
|> Enum.concat([genesis_address])
else
[genesis_address | movements_addresses]
end
compute_resolved_addresses(
genesis_address,
movements_addresses,
recipients,
protocol_version
)

prev_io_nodes =
resolved_addresses
Expand All @@ -192,41 +199,74 @@ defmodule Archethic.SelfRepair.Notifier do
{address, resolved_addresses, prev_storage_nodes, prev_io_nodes -- prev_storage_nodes}
end

defp storage_or_io_node?({_, _, prev_storage_nodes, prev_io_nodes}, unavailable_nodes) do
nodes = prev_storage_nodes ++ prev_io_nodes
Enum.any?(unavailable_nodes, &Enum.member?(nodes, &1))
@spec compute_resolved_addresses(
binary(),
list(binary()),
list(binary()),
integer()
) :: list(binary())
def compute_resolved_addresses(
genesis_address,
movements_addresses,
recipients,
protocol_version
) do
if protocol_version <= 7 do
Task.async_stream(
movements_addresses ++ recipients,
fn address ->
authorized_nodes = P2P.authorized_and_available_nodes()
storage_nodes = Election.chain_storage_nodes(address, authorized_nodes)

{:ok, resolved_genesis_address} =
TransactionChain.fetch_genesis_address(address, storage_nodes)

[address, resolved_genesis_address]
end,
on_timeout: :kill_task,
max_concurrency: max(System.schedulers_online(), length(movements_addresses))
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, addresses} -> addresses end)
|> Enum.concat([genesis_address])
else
[genesis_address | movements_addresses ++ recipients]
end
end

# Notify only if the current node is part of the previous storage / io nodes
# to reduce number of messages
defp notify?({_, _, prev_storage_nodes, prev_io_nodes}) do
defp notify?(%{
address: _,
new_io_nodes: _,
new_storage_nodes: _,
prev_io_nodes: prev_io_nodes,
prev_storage_nodes: prev_storage_nodes
}) do
Enum.member?(prev_storage_nodes ++ prev_io_nodes, Crypto.first_node_public_key())
end

@doc """
New election is carried out on the set of all authorized omiting unavailable_node.
The set of previous storage nodes is subtracted from the set of new storage nodes.
"""
@spec new_storage_nodes(
{binary(), list(Crypto.prepended_hash()), list(Crypto.key()), list(Crypto.key())},
list(Node.t())
) ::
@spec new_storage_nodes(map()) ::
{binary(), list(Crypto.key()), list(Crypto.key())}
def new_storage_nodes(
{address, resolved_addresses, prev_storage_nodes, prev_io_nodes},
new_available_nodes
) do
def new_storage_nodes(%{
address: address,
new_io_nodes: new_io_nodes,
new_storage_nodes: new_storage_nodes,
prev_io_nodes: prev_io_nodes,
prev_storage_nodes: prev_storage_nodes
}) do
new_storage_nodes =
Election.chain_storage_nodes(address, new_available_nodes)
|> Enum.map(& &1.first_public_key)
new_storage_nodes
|> Enum.reject(&Enum.member?(prev_storage_nodes, &1))

already_stored_nodes = prev_storage_nodes ++ prev_io_nodes ++ new_storage_nodes

new_io_nodes =
resolved_addresses
|> Election.io_storage_nodes(new_available_nodes)
|> Enum.map(& &1.first_public_key)
new_io_nodes
|> Enum.reject(&Enum.member?(already_stored_nodes, &1))

{address, new_storage_nodes, new_io_nodes}
Expand Down
27 changes: 17 additions & 10 deletions test/archethic/self_repair/notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Archethic.SelfRepair.NotifierTest do

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations

test "new_storage_nodes/2 should return new election" do
test "new_storage_nodes/1 should return new election" do
node1 = %Node{
first_public_key: "node1",
last_public_key: "node1",
Expand Down Expand Up @@ -68,13 +68,20 @@ defmodule Archethic.SelfRepair.NotifierTest do
P2P.add_and_connect_node(node4)

prev_storage_nodes = ["node2", "node3"]
new_available_nodes = [node1, node2, node3, node4]

assert {"Alice1", ["node4", "node1"], []} =
Notifier.new_storage_nodes(
{"Alice1", [], prev_storage_nodes, []},
new_available_nodes
)
prev_io_nodes = []
new_storage_nodes = ["node4", "node1"]
new_io_nodes = []

result =
Notifier.new_storage_nodes(%{
address: "Alice1",
new_io_nodes: new_io_nodes,
new_storage_nodes: new_storage_nodes,
prev_io_nodes: prev_io_nodes,
prev_storage_nodes: prev_storage_nodes
})

assert {"Alice1", ["node4", "node1"], []} = result
end

test "map_last_address_for_node/1 should create a map with last address for each node" do
Expand All @@ -96,7 +103,7 @@ defmodule Archethic.SelfRepair.NotifierTest do
assert ^expected = Notifier.map_last_addresses_for_node(tab)
end

test "repair_transactions/3 should send message to new storage nodes" do
test "repair_transactions/2 should send message to new storage nodes" do
node = %Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
Expand Down Expand Up @@ -179,7 +186,7 @@ defmodule Archethic.SelfRepair.NotifierTest do
{:ok, %Ok{}}
end)

Notifier.repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes)
Notifier.repair_transactions(prev_available_nodes, new_available_nodes)

# Expect to receive only 1 new node for Alice2
assert_receive :new_node
Expand Down
Loading

0 comments on commit f1c4300

Please sign in to comment.