Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct self repair notifier #487

Merged
1 commit merged into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 144 additions & 76 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
@@ -1,104 +1,172 @@
defmodule Archethic.SelfRepair.Notifier do
@moduledoc false
@moduledoc """
Process to handle repair in case of topology change by trying to replicate transactions to new shard composition.

When a node receive a topology change due to the unavailability of a node,
we compute the new election for the already stored transactions.

Hence, a new shard might me formed as we notify the new transactions to the
new storage nodes

```mermaid
flowchart TD
A[Node 4] --x|Topology change notification| B[Node1]
B --> | List transactions| B
B -->|Elect new nodes| H[Transaction replication]
H -->|Replicate Transaction| C[Node2]
H -->|Replicate Transaction| D[Node3]
```

"""

use GenServer

# alias Archethic.Crypto
#
# alias Archethic.Election
#
alias Archethic.Crypto

alias Archethic.Election

alias Archethic.PubSub

# alias Archethic.P2P
# alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Node

# alias Archethic.TaskSupervisor
#
# alias Archethic.TransactionChain
# alias Archethic.TransactionChain.Transaction
#
# alias Archethic.Utils
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

alias Archethic.Utils

require Logger

def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
end

def init(_) do
PubSub.register_to_node_update()
{:ok, []}
{:ok, %{notified: %{}}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is process crash??, and in a situation where all nodes waiting for each other for node_update: message?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is just to avoid multiple node_update message for a given node, to avoid multiple repair in a close time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the process crashes, it will restart with an empty state, and wait another message of unavailability of the node

end

def handle_info(
{:node_update, %Node{available?: true, authorized?: true, first_public_key: _node_key}},
state
{:node_update,
%Node{
available?: false,
authorized?: true,
first_public_key: node_key,
authorization_date: authorization_date
}},
state = %{notified: notified}
) do
# FIXME
# TODO: Leverage node update reason to know if we have to take the new node in consideration or to reject the nodes to replay election algorithm

# current_node_public_key = Crypto.first_node_public_key()

# if node_key == current_node_public_key do
# {:noreply, state}
# else
# node_list = P2P.list_nodes() |> Enum.reject(&(&1.first_public_key == node_key))
current_node_public_key = Crypto.first_node_public_key()
now = DateTime.utc_now() |> DateTime.truncate(:millisecond)

with :lt <- DateTime.compare(authorization_date, now),
nil <- Map.get(notified, node_key),
false <- current_node_public_key == node_key do
repair_transactions(node_key, current_node_public_key)
{:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))}
else
_ ->
{:noreply, state}
end
end

# node_key
# |> get_transactions_to_sync(node_list)
# |> forward_transactions(node_list, current_node_public_key)
def handle_info(
{:node_update,
%Node{authorized?: false, authorization_date: date, first_public_key: node_key}},
state = %{notified: notified}
)
when date != nil do
current_node_public_key = Crypto.first_node_public_key()

with nil <- Map.get(notified, node_key),
false <- current_node_public_key == node_key do
repair_transactions(node_key, current_node_public_key)
{:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))}
else
_ ->
{:noreply, state}
end
end

# {:noreply, state}
# end
{:noreply, state}
def handle_info(
{:node_update,
%Node{available?: true, first_public_key: node_key, authorization_date: date}},
state
)
when date != nil do
{:noreply, Map.update!(state, :notified, &Map.delete(&1, node_key))}
end

def handle_info(_, state) do
{:noreply, state}
end

# defp get_transactions_to_sync(node_public_key, node_list) do
# TransactionChain.list_all([:address, :type])
# |> Stream.map(fn %Transaction{address: address, type: type} ->
# {address, type, Election.chain_storage_nodes_with_type(address, type, node_list)}
# end)
# |> Stream.filter(fn {_address, _type, nodes} ->
# Utils.key_in_node_list?(nodes, node_public_key)
# end)
# end

# defp forward_transactions(
# transactions,
# node_list,
# current_node_public_key
# ) do
# transactions
# |> Stream.each(fn {address, type, _nodes} ->
# previous_storage_nodes =
# Election.chain_storage_nodes_with_type(
# address,
# type,
# node_list
# )

# next_storage_nodes =
# Election.chain_storage_nodes_with_type(
# address,
# type,
# P2P.available_nodes() -- previous_storage_nodes
# )

# with true <- Utils.key_in_node_list?(next_storage_nodes, current_node_public_key),
# {:ok, tx} <- TransactionChain.get_transaction(address) do
# Task.Supervisor.async_stream_nolink(
# TaskSupervisor,
# next_storage_nodes,
# &P2P.send_message(&1, %ReplicateTransactionChain{transaction: tx}),
# ordered: false,
# on_timeout: :kill_task
# )
# |> Stream.run()
# end
# end)
# |> Stream.run()
# end
defp repair_transactions(node_key, current_node_public_key) do
Logger.debug("Trying to repair transactions due to a topology change",
node: Base.encode16(node_key)
)

node_key
|> get_transactions_to_sync()
|> Stream.each(&forward_transaction(&1, current_node_public_key))
|> Stream.run()
end

defp get_transactions_to_sync(node_public_key) do
# We fetch all the transactions existing and check if the disconnecting node was a storage node
TransactionChain.list_all([:address, :type, validation_stamp: [:timestamp]])
|> Stream.map(
fn tx = %Transaction{
address: address,
type: type,
validation_stamp: %ValidationStamp{timestamp: timestamp}
} ->
node_list =
Enum.filter(
P2P.list_nodes(),
&(&1.authorization_date != nil and
DateTime.compare(&1.authorization_date, timestamp) == :lt)
)

{tx, Election.chain_storage_nodes_with_type(address, type, node_list)}
end
)
|> Stream.filter(fn {_tx, nodes} ->
Utils.key_in_node_list?(nodes, node_public_key)
end)
end

defp forward_transaction(
{tx = %Transaction{address: address, type: type}, previous_storage_nodes},
current_node_public_key
) do
# We compute the new storage nodes minus the previous ones
new_storage_nodes =
Election.chain_storage_nodes_with_type(
address,
type,
P2P.authorized_nodes() -- previous_storage_nodes
)

with false <- Enum.empty?(new_storage_nodes),
true <- Utils.key_in_node_list?(previous_storage_nodes, current_node_public_key) do
Logger.info("Repair started due to network topology change",
transaction_address: Base.encode16(address),
transaction_type: type
)

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
new_storage_nodes,
&P2P.send_message(&1, %ReplicateTransaction{transaction: tx}),
ordered: false,
on_timeout: :kill_task
)
|> Stream.run()
end
end
end
88 changes: 88 additions & 0 deletions test/archethic/self_repair/notifier_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
defmodule Archethic.SelfRepair.NotifierTest do
use ArchethicCase

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Node

alias Archethic.SelfRepair.Notifier
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

import Mox

test "when a node is becoming offline new nodes should receive transaction to replicate" do
P2P.add_and_connect_node(%Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.first_node_public_key(),
ip: {127, 0, 0, 1},
port: 3000,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z],
geo_patch: "AAA"
})

P2P.add_and_connect_node(%Node{
first_public_key: "node2",
last_public_key: "node2",
ip: {127, 0, 0, 1},
port: 3001,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z],
geo_patch: "CCC"
})

P2P.add_and_connect_node(%Node{
first_public_key: "node3",
last_public_key: "node3",
ip: {127, 0, 0, 1},
port: 3002,
authorized?: true,
authorization_date: ~U[2022-02-03 00:00:00Z],
geo_patch: "DDD"
})

{:ok, pid} = Notifier.start_link()

MockDB
|> expect(:list_transactions, fn _ ->
[
%Transaction{
address: "@Alice1",
type: :transfer,
validation_stamp: %ValidationStamp{
timestamp: ~U[2022-02-01 12:54:00Z]
}
}
]
end)

me = self()

MockClient
|> expect(:send_message, fn %Node{first_public_key: "node3"},
%ReplicateTransaction{
transaction: %Transaction{address: "@Alice1"}
},
_ ->
send(me, :tx_replicated)
%Ok{}
end)

send(
pid,
{:node_update,
%Node{
first_public_key: "node2",
available?: false,
authorized?: true,
authorization_date: ~U[2022-02-01 00:00:00Z]
}}
)

assert_receive :tx_replicated
end
end