Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

request missing address while receiving new last address #748

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ defmodule Archethic.P2P.Message do
def encode(%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
}) do
<<22::8, last_address::binary, genesis_address::binary,
<<22::8, last_address::binary, genesis_address::binary, previous_address::binary,
DateTime.to_unix(timestamp, :millisecond)::64>>
end

Expand Down Expand Up @@ -904,11 +905,13 @@ defmodule Archethic.P2P.Message do

def decode(<<22::8, rest::bitstring>>) do
{last_address, rest} = Utils.deserialize_address(rest)
{genesis_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)
{genesis_address, rest} = Utils.deserialize_address(rest)
{previous_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
end
Expand Down Expand Up @@ -1716,19 +1719,24 @@ defmodule Archethic.P2P.Message do

def process(
%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
last_address: last_address,
previous_address: previous_address,
timestamp: timestamp
},
_
) do
with {local_last_address, _} <-
TransactionChain.get_last_address(genesis_address),
with {local_last_address, _} <- TransactionChain.get_last_address(genesis_address),
true <- local_last_address != last_address do
TransactionChain.register_last_address(genesis_address, last_address, timestamp)
if local_last_address == previous_address do
TransactionChain.register_last_address(genesis_address, last_address, timestamp)

# Stop potential previous smart contract
Contracts.stop_contract(local_last_address)
# Stop potential previous smart contract
Contracts.stop_contract(local_last_address)
else
authorized_nodes = P2P.authorized_and_available_nodes()
SelfRepair.update_last_address(local_last_address, authorized_nodes)
end
end

%Ok{}
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/notify_last_transaction_address.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do
@moduledoc """
Represents a message with to notify a pool of the last address of a previous address
"""
@enforce_keys [:last_address, :genesis_address, :timestamp]
defstruct [:last_address, :genesis_address, :timestamp]
@enforce_keys [:last_address, :genesis_address, :previous_address, :timestamp]
defstruct [:last_address, :genesis_address, :previous_address, :timestamp]

alias Archethic.Crypto

@type t :: %__MODULE__{
last_address: Crypto.versioned_hash(),
genesis_address: Crypto.versioned_hash(),
previous_address: Crypto.versioned_hash(),
timestamp: DateTime.t()
}
end
7 changes: 5 additions & 2 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,11 @@ defmodule Archethic.Replication do
last_storage_nodes =
Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

{:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} =
{:ok, tx = %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} =
TransactionChain.get_transaction(address, validation_stamp: [:timestamp])

previous_address = Transaction.previous_address(tx)

# Send a message to all the previous storage nodes
genesis_address
|> TransactionChain.list_chain_addresses()
Expand All @@ -359,7 +361,8 @@ defmodule Archethic.Replication do
|> P2P.broadcast_message(%NotifyLastTransactionAddress{
last_address: address,
genesis_address: genesis_address,
timestamp: timestamp
timestamp: timestamp,
previous_address: previous_address
})
end
end
Expand Down
46 changes: 41 additions & 5 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ defmodule Archethic.SelfRepair do
alias __MODULE__.Scheduler
alias __MODULE__.Sync

alias Archethic.BeaconChain

alias Archethic.Crypto
alias Archethic.{
BeaconChain,
Crypto,
Utils,
Contracts,
TransactionChain,
Election
}

alias Archethic.P2P.Node

alias Archethic.Utils

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

Expand Down Expand Up @@ -159,4 +162,37 @@ defmodule Archethic.SelfRepair do
|> Keyword.get(Scheduler)
|> Scheduler.config_change()
end

@doc """
Request missing transaction addresses from last local address until last chain address
and add them in the DB
"""
def update_last_address(address, authorized_nodes) do
# As the node is storage node of this chain, it needs to know all the addresses of the chain until the last
# So we get the local last address and verify if it's the same as the last address of the chain
# by requesting the nodes which already know the last address

{last_local_address, _timestamp} = TransactionChain.get_last_address(address)
storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes)

case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do
{:ok, []} ->
:ok

{:ok, addresses} ->
genesis_address = TransactionChain.get_genesis_address(address)

addresses
|> Enum.sort_by(fn {_address, timestamp} -> timestamp end)
|> Enum.each(fn {address, timestamp} ->
TransactionChain.register_last_address(genesis_address, address, timestamp)
end)

# Stop potential previous smart contract
Contracts.stop_contract(address)

_ ->
:ok
end
end
end
39 changes: 3 additions & 36 deletions lib/archethic/self_repair/repair_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ defmodule Archethic.SelfRepair.RepairWorker do
@moduledoc false

alias Archethic.{
Contracts,
BeaconChain,
Election,
P2P,
Replication,
TransactionChain
TransactionChain,
SelfRepair
}

alias Archethic.SelfRepair.RepairRegistry
Expand Down Expand Up @@ -130,7 +130,7 @@ defmodule Archethic.SelfRepair.RepairWorker do
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(address, storage_nodes) do
if storage? do
case Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes) do
:ok -> update_last_address(address, authorized_nodes)
:ok -> SelfRepair.update_last_address(address, authorized_nodes)
error -> error
end
else
Expand All @@ -148,37 +148,4 @@ defmodule Archethic.SelfRepair.RepairWorker do
)
end
end

@doc """
Request missing transaction addresses from last local address until last chain address
and add them in the DB
"""
def update_last_address(address, authorized_nodes) do
# As the node is storage node of this chain, it needs to know all the addresses of the chain until the last
# So we get the local last address and verify if it's the same as the last address of the chain
# by requesting the nodes which already know the last address

{last_local_address, _timestamp} = TransactionChain.get_last_address(address)
storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes)

case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do
{:ok, []} ->
:ok

{:ok, addresses} ->
genesis_address = TransactionChain.get_genesis_address(address)

addresses
|> Enum.sort_by(fn {_address, timestamp} -> timestamp end)
|> Enum.each(fn {address, timestamp} ->
TransactionChain.register_last_address(genesis_address, address, timestamp)
end)

# Stop potential previous smart contract
Contracts.stop_contract(address)

_ ->
:ok
end
end
end
1 change: 1 addition & 0 deletions test/archethic/p2p/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ defmodule Archethic.P2P.MessageTest do
msg = %NotifyLastTransactionAddress{
last_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
genesis_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond)
}

Expand Down
42 changes: 38 additions & 4 deletions test/archethic/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.ReplicationTest do
alias Archethic.Mining.Fee

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.GetTransactionChainLength
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.GetTransaction
Expand Down Expand Up @@ -250,27 +251,60 @@ defmodule Archethic.ReplicationTest do
})
end)

previous_public_key = "previous_public_key"

MockDB
|> expect(:get_first_chain_address, fn _ -> "@Alice0" end)
|> expect(:get_transaction, fn _, _ ->
{:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}}}
{:ok,
%Transaction{
validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()},
previous_public_key: previous_public_key
}}
end)
|> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end)

MockClient
|> stub(:send_message, fn _,
%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address
genesis_address: genesis_address,
previous_address: previous_address
},
_ ->
send(me, {:last_address, last_address, genesis_address})
send(me, {:last_address, last_address, genesis_address, previous_address})
{:ok, %Ok{}}
end)

derived_previous_address = Crypto.derive_address(previous_public_key)

assert :ok = Replication.acknowledge_previous_storage_nodes("@Alice2")

assert_receive {:last_address, "@Alice2", "@Alice0"}
assert_receive {:last_address, "@Alice2", "@Alice0", ^derived_previous_address}
end

test "should process NotifyLastTransactionAddress message with TransactionChain.register_last_address if last address is different than the previous address" do
previous_public_key = "previous_public_key"

MockDB
|> expect(:get_first_chain_address, fn _ -> "@Alice0" end)
|> expect(:get_transaction, fn _, _ ->
{:ok,
%Transaction{
validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()},
previous_public_key: previous_public_key
}}
end)
|> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end)
|> expect(:add_last_transaction_address, 0, fn _, _, _ -> :ok end)

MockClient
|> stub(:send_message, fn _, msg = %NotifyLastTransactionAddress{}, _ ->
Message.process(msg, "key")
{:ok, %Ok{}}
end)

assert :ok = Replication.acknowledge_previous_storage_nodes("@Alice2")
end
end
end
55 changes: 0 additions & 55 deletions test/archethic/self_repair/repair_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,12 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do

alias Archethic.BeaconChain.SummaryTimer

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Client.DefaultImpl
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetNextAddresses
alias Archethic.P2P.Message.GetTransaction

alias Archethic.SelfRepair.RepairWorker

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp

import Mox

setup do
Expand Down Expand Up @@ -113,52 +106,4 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do
task: _task_pid
} = :sys.get_state(pid)
end

test "update_last_address/1 should request missing addresses and add them in DB" do
node = %Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
geo_patch: "AAA",
authorized?: true,
authorization_date: ~U[2022-11-27 00:00:00Z],
available?: true,
availability_history: <<1::1>>
}

me = self()

MockDB
|> expect(:get_last_chain_address, fn "Alice2" -> {"Alice2", ~U[2022-11-27 00:10:00Z]} end)
|> expect(:get_transaction, fn "Alice2", _ ->
{:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: ~U[2022-11-27 00:10:00Z]}}}
end)
|> expect(:get_first_chain_address, 2, fn "Alice2" -> "Alice0" end)
|> expect(:list_chain_addresses, fn "Alice0" ->
[
{"Alice1", ~U[2022-11-27 00:09:00Z]},
{"Alice2", ~U[2022-11-27 00:10:00Z]},
{"Alice3", ~U[2022-11-27 00:11:00Z]},
{"Alice4", ~U[2022-11-27 00:12:00Z]}
]
end)
|> expect(:add_last_transaction_address, 2, fn
"Alice0", "Alice3", ~U[2022-11-27 00:11:00Z] ->
send(me, :add_alice3)

"Alice0", "Alice4", ~U[2022-11-27 00:12:00Z] ->
send(me, :add_alice4)
end)

MockClient
|> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout ->
send(me, :get_next_addresses)
DefaultImpl.send_message(node, msg, timeout)
end)

RepairWorker.update_last_address("Alice2", [node])

assert_receive :get_next_addresses
assert_receive :add_alice3
assert_receive :add_alice4
end
end
Loading