Skip to content

Commit

Permalink
Replicate previous chain raise on missed tx
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Oct 8, 2024
1 parent 4062b4d commit f3b6ae2
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 42 deletions.
27 changes: 27 additions & 0 deletions lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ defmodule Archethic.Replication.TransactionContext do
genesis_address
|> TransactionChain.fetch(storage_nodes, paging_state: paging_address)
|> Stream.take_while(&(Transaction.previous_address(&1) != limit_address))
|> ensure_all_tx_fetched(paging_address, genesis_address, limit_address)
end
end

Expand All @@ -84,4 +85,30 @@ defmodule Archethic.Replication.TransactionContext do

TransactionChain.fetch_unspent_outputs(genesis_address, genesis_nodes) |> Enum.to_list()
end

defp ensure_all_tx_fetched(transactions, paging_address, genesis_address, limit_address) do
paging_address = if paging_address == nil, do: genesis_address, else: paging_address

Stream.transform(
transactions,
# init accumulator
fn -> paging_address end,
# loop over transactions
fn tx = %Transaction{address: address}, expected_previous_address ->
if Transaction.previous_address(tx) != expected_previous_address do
raise(
"Replication failed to fetch previous chain after #{Base.encode16(expected_previous_address)}"
)
end

{[tx], address}
end,
# after all tx processed
fn last_address ->
if last_address != limit_address do
raise "Replication failed to fetch previous chain after #{Base.encode16(last_address)}"
end
end
)
end
end
167 changes: 125 additions & 42 deletions test/archethic/replication/transaction_context_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule Archethic.Replication.TransactionContextTest do

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput

import ArchethicCase
alias Archethic.TransactionFactory

import Mox

test "fetch_transaction/1 should retrieve the transaction" do
Expand All @@ -42,47 +43,129 @@ defmodule Archethic.Replication.TransactionContextTest do
assert %Transaction{} = TransactionContext.fetch_transaction("@Alice1")
end

test "stream_transaction_chain/1 should retrieve the previous transaction chain" do
pub1 = random_public_key()
pub2 = random_public_key()

genesis = random_address()
addr1 = Crypto.derive_address(pub1)
addr2 = Crypto.derive_address(pub2)

MockDB
|> expect(:get_last_chain_address_stored, fn ^genesis -> addr1 end)

MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: ^addr1}, _ ->
{:ok,
%TransactionList{
transactions: [
%Transaction{previous_public_key: pub1},
%Transaction{previous_public_key: pub2}
]
}}
end)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.last_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
available?: true,
geo_patch: "AAA",
network_patch: "AAA",
authorized?: true,
authorization_date: DateTime.utc_now()
})

chain =
genesis
|> TransactionContext.stream_transaction_chain(addr2, P2P.authorized_and_available_nodes())
|> Enum.to_list()

assert [%Transaction{previous_public_key: ^pub1}] = chain
describe "stream_transaction_chain/3" do
setup do
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.last_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
available?: true,
geo_patch: "AAA",
network_patch: "AAA",
authorized?: true,
authorization_date: DateTime.utc_now()
})

%{transactions: Enum.map(0..3, &TransactionFactory.create_valid_transaction([], index: &1))}
end

test "should retrieve the previous transaction chain with part of chain already stored", %{
transactions: [tx1, tx2, tx3, tx4]
} do
genesis = Transaction.previous_address(tx1)
addr1 = tx1.address
addr3 = tx3.address

MockDB
|> expect(:get_last_chain_address_stored, fn ^genesis -> addr1 end)

MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: ^addr1}, _ ->
{:ok, %TransactionList{transactions: [tx2, tx3, tx4]}}
end)

nodes = P2P.authorized_and_available_nodes()

assert [tx2, tx3] ==
genesis
|> TransactionContext.stream_transaction_chain(addr3, nodes)
|> Enum.to_list()
end

test "should retrieve the previous transactions with no tx of the chain already stored", %{
transactions: [tx1, tx2, tx3, tx4]
} do
genesis = Transaction.previous_address(tx1)
addr3 = tx3.address

MockDB
|> expect(:get_last_chain_address_stored, fn ^genesis -> nil end)

MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ ->
{:ok, %TransactionList{transactions: [tx1, tx2, tx3, tx4]}}
end)

nodes = P2P.authorized_and_available_nodes()

assert [tx1, tx2, tx3] ==
genesis
|> TransactionContext.stream_transaction_chain(addr3, nodes)
|> Enum.to_list()
end

test "should raise en error if part of the chain is not fetched", %{
transactions: [tx1, tx2, tx3, tx4]
} do
genesis = Transaction.previous_address(tx1)
addr3 = tx3.address

MockDB
|> stub(:get_last_chain_address_stored, fn ^genesis -> nil end)

nodes = P2P.authorized_and_available_nodes()

# Missing first transaction
MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ ->
{:ok, %TransactionList{transactions: [tx2, tx3, tx4]}}
end)

expected_message =
"Replication failed to fetch previous chain after #{Base.encode16(genesis)}"

assert_raise RuntimeError, expected_message, fn ->
genesis
|> TransactionContext.stream_transaction_chain(addr3, nodes)
|> Enum.to_list()
end

# Missing middle transaction
MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ ->
{:ok, %TransactionList{transactions: [tx1, tx3, tx4]}}
end)

expected_message =
"Replication failed to fetch previous chain after #{Base.encode16(tx1.address)}"

assert_raise RuntimeError, expected_message, fn ->
genesis
|> TransactionContext.stream_transaction_chain(addr3, nodes)
|> Enum.to_list()
end

# Missing last transaction
MockClient
|> expect(:send_message, fn
_, %GetTransactionChain{address: ^genesis, paging_state: nil}, _ ->
{:ok, %TransactionList{transactions: [tx1, tx2]}}
end)

expected_message =
"Replication failed to fetch previous chain after #{Base.encode16(tx2.address)}"

assert_raise RuntimeError, expected_message, fn ->
genesis
|> TransactionContext.stream_transaction_chain(addr3, nodes)
|> Enum.to_list()
end
end
end

test "fetch_transaction_unspent_outputs/1 should retrieve the utxos of the chain" do
Expand Down

0 comments on commit f3b6ae2

Please sign in to comment.