Skip to content

Commit

Permalink
Correct self repair notifier (#487)
Browse files Browse the repository at this point in the history
Trigger repair on the right P2P network change
  • Loading branch information
Samuel authored Aug 4, 2022
1 parent 6c74a89 commit fc737ef
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 76 deletions.
220 changes: 144 additions & 76 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
@@ -1,104 +1,172 @@
defmodule Archethic.SelfRepair.Notifier do
@moduledoc false
@moduledoc """
Process to handle repair in case of topology change by trying to replicate transactions to new shard composition.
When a node receive a topology change due to the unavailability of a node,
we compute the new election for the already stored transactions.
Hence, a new shard might me formed as we notify the new transactions to the
new storage nodes
```mermaid
flowchart TD
A[Node 4] --x|Topology change notification| B[Node1]
B --> | List transactions| B
B -->|Elect new nodes| H[Transaction replication]
H -->|Replicate Transaction| C[Node2]
H -->|Replicate Transaction| D[Node3]
```
"""

use GenServer

# alias Archethic.Crypto
#
# alias Archethic.Election
#
alias Archethic.Crypto

alias Archethic.Election

alias Archethic.PubSub

# alias Archethic.P2P
# alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Node

# alias Archethic.TaskSupervisor
#
# alias Archethic.TransactionChain
# alias Archethic.TransactionChain.Transaction
#
# alias Archethic.Utils
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

alias Archethic.Utils

require Logger

def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
end

def init(_) do
PubSub.register_to_node_update()
{:ok, []}
{:ok, %{notified: %{}}}
end

def handle_info(
{:node_update, %Node{available?: true, authorized?: true, first_public_key: _node_key}},
state
{:node_update,
%Node{
available?: false,
authorized?: true,
first_public_key: node_key,
authorization_date: authorization_date
}},
state = %{notified: notified}
) do
# FIXME
# TODO: Leverage node update reason to know if we have to take the new node in consideration or to reject the nodes to replay election algorithm

# current_node_public_key = Crypto.first_node_public_key()

# if node_key == current_node_public_key do
# {:noreply, state}
# else
# node_list = P2P.list_nodes() |> Enum.reject(&(&1.first_public_key == node_key))
current_node_public_key = Crypto.first_node_public_key()
now = DateTime.utc_now() |> DateTime.truncate(:millisecond)

with :lt <- DateTime.compare(authorization_date, now),
nil <- Map.get(notified, node_key),
false <- current_node_public_key == node_key do
repair_transactions(node_key, current_node_public_key)
{:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))}
else
_ ->
{:noreply, state}
end
end

# node_key
# |> get_transactions_to_sync(node_list)
# |> forward_transactions(node_list, current_node_public_key)
def handle_info(
{:node_update,
%Node{authorized?: false, authorization_date: date, first_public_key: node_key}},
state = %{notified: notified}
)
when date != nil do
current_node_public_key = Crypto.first_node_public_key()

with nil <- Map.get(notified, node_key),
false <- current_node_public_key == node_key do
repair_transactions(node_key, current_node_public_key)
{:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))}
else
_ ->
{:noreply, state}
end
end

# {:noreply, state}
# end
{:noreply, state}
def handle_info(
{:node_update,
%Node{available?: true, first_public_key: node_key, authorization_date: date}},
state
)
when date != nil do
{:noreply, Map.update!(state, :notified, &Map.delete(&1, node_key))}
end

def handle_info(_, state) do
{:noreply, state}
end

# defp get_transactions_to_sync(node_public_key, node_list) do
# TransactionChain.list_all([:address, :type])
# |> Stream.map(fn %Transaction{address: address, type: type} ->
# {address, type, Election.chain_storage_nodes_with_type(address, type, node_list)}
# end)
# |> Stream.filter(fn {_address, _type, nodes} ->
# Utils.key_in_node_list?(nodes, node_public_key)
# end)
# end

# defp forward_transactions(
# transactions,
# node_list,
# current_node_public_key
# ) do
# transactions
# |> Stream.each(fn {address, type, _nodes} ->
# previous_storage_nodes =
# Election.chain_storage_nodes_with_type(
# address,
# type,
# node_list
# )

# next_storage_nodes =
# Election.chain_storage_nodes_with_type(
# address,
# type,
# P2P.available_nodes() -- previous_storage_nodes
# )

# with true <- Utils.key_in_node_list?(next_storage_nodes, current_node_public_key),
# {:ok, tx} <- TransactionChain.get_transaction(address) do
# Task.Supervisor.async_stream_nolink(
# TaskSupervisor,
# next_storage_nodes,
# &P2P.send_message(&1, %ReplicateTransactionChain{transaction: tx}),
# ordered: false,
# on_timeout: :kill_task
# )
# |> Stream.run()
# end
# end)
# |> Stream.run()
# end
defp repair_transactions(node_key, current_node_public_key) do
Logger.debug("Trying to repair transactions due to a topology change",
node: Base.encode16(node_key)
)

node_key
|> get_transactions_to_sync()
|> Stream.each(&forward_transaction(&1, current_node_public_key))
|> Stream.run()
end

defp get_transactions_to_sync(node_public_key) do
# We fetch all the transactions existing and check if the disconnecting node was a storage node
TransactionChain.list_all([:address, :type, validation_stamp: [:timestamp]])
|> Stream.map(
fn tx = %Transaction{
address: address,
type: type,
validation_stamp: %ValidationStamp{timestamp: timestamp}
} ->
node_list =
Enum.filter(
P2P.list_nodes(),
&(&1.authorization_date != nil and
DateTime.compare(&1.authorization_date, timestamp) == :lt)
)

{tx, Election.chain_storage_nodes_with_type(address, type, node_list)}
end
)
|> Stream.filter(fn {_tx, nodes} ->
Utils.key_in_node_list?(nodes, node_public_key)
end)
end

defp forward_transaction(
{tx = %Transaction{address: address, type: type}, previous_storage_nodes},
current_node_public_key
) do
# We compute the new storage nodes minus the previous ones
new_storage_nodes =
Election.chain_storage_nodes_with_type(
address,
type,
P2P.authorized_nodes() -- previous_storage_nodes
)

with false <- Enum.empty?(new_storage_nodes),
true <- Utils.key_in_node_list?(previous_storage_nodes, current_node_public_key) do
Logger.info("Repair started due to network topology change",
transaction_address: Base.encode16(address),
transaction_type: type
)

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
new_storage_nodes,
&P2P.send_message(&1, %ReplicateTransaction{transaction: tx}),
ordered: false,
on_timeout: :kill_task
)
|> Stream.run()
end
end
end
88 changes: 88 additions & 0 deletions test/archethic/self_repair/notifier_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
defmodule Archethic.SelfRepair.NotifierTest do
use ArchethicCase

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Node

alias Archethic.SelfRepair.Notifier
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

import Mox

test "when a node is becoming offline new nodes should receive transaction to replicate" do
P2P.add_and_connect_node(%Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.first_node_public_key(),
ip: {127, 0, 0, 1},
port: 3000,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z],
geo_patch: "AAA"
})

P2P.add_and_connect_node(%Node{
first_public_key: "node2",
last_public_key: "node2",
ip: {127, 0, 0, 1},
port: 3001,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z],
geo_patch: "CCC"
})

P2P.add_and_connect_node(%Node{
first_public_key: "node3",
last_public_key: "node3",
ip: {127, 0, 0, 1},
port: 3002,
authorized?: true,
authorization_date: ~U[2022-02-03 00:00:00Z],
geo_patch: "DDD"
})

{:ok, pid} = Notifier.start_link()

MockDB
|> expect(:list_transactions, fn _ ->
[
%Transaction{
address: "@Alice1",
type: :transfer,
validation_stamp: %ValidationStamp{
timestamp: ~U[2022-02-01 12:54:00Z]
}
}
]
end)

me = self()

MockClient
|> expect(:send_message, fn %Node{first_public_key: "node3"},
%ReplicateTransaction{
transaction: %Transaction{address: "@Alice1"}
},
_ ->
send(me, :tx_replicated)
%Ok{}
end)

send(
pid,
{:node_update,
%Node{
first_public_key: "node2",
available?: false,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z]
}}
)

assert_receive :tx_replicated
end
end

0 comments on commit fc737ef

Please sign in to comment.