Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explorer optimizations (read DB before asking network when possible) #784

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion assets/css/variables.scss
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
$primary: #006cd2;
$explorer_bg: linear-gradient(-30deg,#00a4db 30%,#006cd2 70%);
$explorer_bg: linear-gradient(-30deg, #00a4db 30%, #006cd2 70%);

$navbar-burger-color: #fff;
$navbar-dropdown-background-color: #006cd2;
Expand All @@ -13,3 +13,5 @@ $navbar-item-hover-background-color: transparent;
$card-header-background-color: #fff;

$modal-background-background-color: #0a0a0a61;
$pagination-color: #fff;
$pagination-hover-color: #e6e6e6;
164 changes: 156 additions & 8 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,51 @@ defmodule Archethic do
"""

alias __MODULE__.Account
alias __MODULE__.BeaconChain
alias __MODULE__.Crypto

alias __MODULE__.Election

alias __MODULE__.P2P
alias __MODULE__.P2P.Node

alias __MODULE__.P2P.Message.Balance
alias __MODULE__.P2P.Message.GetBalance
alias __MODULE__.P2P.Message.GetCurrentSummaries
alias __MODULE__.P2P.Message.GetTransactionSummary
alias __MODULE__.P2P.Message.NewTransaction
alias __MODULE__.P2P.Message.NotFound
alias __MODULE__.P2P.Message.Ok
alias __MODULE__.P2P.Message.StartMining
alias __MODULE__.P2P.Message.TransactionSummaryList

alias __MODULE__.TransactionChain
alias __MODULE__.TransactionChain.Transaction
alias __MODULE__.TransactionChain.TransactionInput
alias __MODULE__.TransactionChain.TransactionSummary

require Logger

@doc """
Query the search of the transaction to the dedicated storage pool from the closest nodes
Search a transaction by its address
Check locally and fallback to a quorum read
"""
@spec search_transaction(address :: binary()) ::
{:ok, Transaction.t()}
| {:error, :transaction_not_exists}
| {:error, :transaction_invalid}
| {:error, :network_issue}
def search_transaction(address) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())
TransactionChain.fetch_transaction_remotely(address, storage_nodes)
case TransactionChain.get_transaction(address) do
{:ok, tx} ->
{:ok, tx}

{:error, :invalid_transaction} ->
{:error, :transaction_invalid}

{:error, :transaction_not_exists} ->
storage_nodes =
Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

TransactionChain.fetch_transaction_remotely(address, storage_nodes)
end
end

@doc """
Expand Down Expand Up @@ -122,8 +137,7 @@ defmodule Archethic do
def get_last_transaction(address) when is_binary(address) do
case get_last_transaction_address(address) do
{:ok, last_address} ->
nodes = Election.chain_storage_nodes(last_address, P2P.authorized_and_available_nodes())
TransactionChain.fetch_transaction_remotely(last_address, nodes)
search_transaction(last_address)

{:error, :network_issue} = e ->
e
Expand Down Expand Up @@ -321,4 +335,138 @@ defmodule Archethic do
nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())
TransactionChain.fetch_size_remotely(address, nodes)
end

@doc """
Fetch a summaries aggregate for a given date.
Check locally first and fallback to a quorum read
"""
@spec fetch_summaries_aggregate(DateTime.t()) ::
{:ok, BeaconChain.SummaryAggregate.t()} | {:error, atom()}
def fetch_summaries_aggregate(date) do
case BeaconChain.get_summaries_aggregate(date) do
{:error, :not_exists} ->
nodes = P2P.authorized_and_available_nodes()
BeaconChain.fetch_summaries_aggregate(date, nodes)

{:ok, aggregate} ->
{:ok, aggregate}
end
end

@doc """
Request from the beacon chains all the summaries for the given dates and aggregate them
"""
@spec fetch_and_aggregate_summaries(DateTime.t()) :: BeaconChain.SummaryAggregate.t()
def fetch_and_aggregate_summaries(date) do
BeaconChain.fetch_and_aggregate_summaries(date, P2P.authorized_and_available_nodes())
end

@doc """
Retrieve the genesis address locally or remotely
"""
def fetch_genesis_address_remotely(address) do
case TransactionChain.get_genesis_address(address) do
^address ->
# if returned address is same as given, it means the DB does not contain the value
TransactionChain.fetch_genesis_address_remotely(address)

genesis_address ->
genesis_address
end
end

@doc """
Slots which are already has been added
Real time transaction can be get from pubsub
"""
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(date = %DateTime{} \\ DateTime.utc_now()) do
authorized_nodes = P2P.authorized_and_available_nodes()

ref_time = DateTime.truncate(date, :millisecond)

next_summary_date = BeaconChain.next_summary_date(ref_time)

BeaconChain.list_subsets()
|> Flow.from_enumerable(stages: 256)
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
subset
|> Election.beacon_storage_nodes(next_summary_date, authorized_nodes)
|> Enum.filter(&Node.locally_available?/1)
|> P2P.nearest_nodes()
|> Enum.take(3)
|> Enum.map(&{&1, subset})
end)
# We partition by node
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> %{} end, fn {node, subset}, acc ->
# We aggregate the subsets for a given node
Map.update(acc, node, [subset], &[subset | &1])
end)
|> Flow.flat_map(fn {node, subsets} ->
# For this node we fetch the summaries
fetch_summaries(node, subsets)
end)
|> Stream.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

@doc """
Check if a transaction exists at address
Check locally first and fallback to a quorum read
"""
@spec transaction_exists?(binary()) :: boolean()
def transaction_exists?(address) do
bchamagne marked this conversation as resolved.
Show resolved Hide resolved
if TransactionChain.transaction_exists?(address) do
# if it exists locally, no need to query the network
true
else
storage_nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

conflict_resolver = fn results ->
# Prioritize transactions results over not found
case Enum.filter(results, &match?(%TransactionSummary{}, &1)) do
[] ->
%NotFound{}

[first | _] ->
first
end
end

case P2P.quorum_read(
storage_nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, %TransactionSummary{address: ^address}} ->
true

{:ok, %NotFound{}} ->
false

{:error, e} ->
raise e
end
end
end

defp fetch_summaries(node, subsets) do
subsets
|> Stream.chunk_every(10)
|> Task.async_stream(fn subsets ->
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries

_ ->
[]
end
end)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
end
end
10 changes: 5 additions & 5 deletions lib/archethic/election.ex
Original file line number Diff line number Diff line change
Expand Up @@ -647,16 +647,16 @@ defmodule Archethic.Election do
@doc """
Determine if a node's public key must be a beacon storage node
"""
@spec beacon_storage_node?(binary(), DateTime.t(), Crypto.key(), list(Node.t())) :: boolean()
@spec beacon_storage_node?(DateTime.t(), Crypto.key(), list(Node.t())) :: boolean()
def beacon_storage_node?(
address,
timestamp = %DateTime{},
public_key,
node_list
)
when is_binary(address) and is_binary(public_key) and is_list(node_list) do
address
|> beacon_storage_nodes(timestamp, node_list)
when is_binary(public_key) and is_list(node_list) do
timestamp
|> Crypto.derive_beacon_aggregate_address()
|> chain_storage_nodes(node_list)
|> Utils.key_in_node_list?(public_key)
end

Expand Down
33 changes: 33 additions & 0 deletions lib/archethic_web/components/pagination.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule ArchethicWeb.Pagination do
@moduledoc false

use Phoenix.Component

@doc """
Variables expected:
- current_page INTEGER
- total_pages INTEGER

"""
def previous_next(assigns) do
~H"""
<nav class="pagination is-right" role="navigation" aria-label="pagination">
<%= if @current_page > 1 do %>
<a class="pagination-previous" phx-value-page={@current_page - 1} phx-click="goto">Previous</a>
<% else %>
<a class="pagination-previous is-disabled">Previous</a>
<% end %>

<%= if @current_page + 1 <= @total_pages do %>
<a class="pagination-next" phx-value-page={@current_page + 1} phx-click="goto">Next page</a>
<% else %>
<a class="pagination-next is-disabled">Next page</a>
<% end %>

<p class="pagination-list has-text-white">
Page <%= @current_page %> on <%= @total_pages %>
</p>
</nav>
"""
end
end
47 changes: 7 additions & 40 deletions lib/archethic_web/controllers/api/transaction_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@ defmodule ArchethicWeb.API.TransactionController do

alias Archethic.TransactionChain.{
Transaction,
TransactionData,
TransactionSummary
TransactionData
}

alias Archethic.P2P

alias Archethic.P2P.Message.{
GetTransactionSummary,
NotFound
}

alias Archethic.Election

alias Archethic.Mining
alias Archethic.OracleChain

Expand All @@ -39,23 +29,14 @@ defmodule ArchethicWeb.API.TransactionController do

tx_address = tx.address

storage_nodes =
Election.chain_storage_nodes(tx_address, P2P.authorized_and_available_nodes())

conflict_resolver = summary_conflict_resolver()

case P2P.quorum_read(
storage_nodes,
%GetTransactionSummary{address: tx_address},
conflict_resolver
) do
{:ok, %TransactionSummary{address: ^tx_address}} ->
try do
if Archethic.transaction_exists?(tx_address) do
conn |> put_status(422) |> json(%{status: "error - transaction already exists!"})

{:ok, %NotFound{}} ->
else
send_transaction(conn, tx)

{:error, e} ->
end
catch
e ->
Logger.error("Cannot get transaction summary - #{inspect(e)}")
conn |> put_status(504) |> json(%{status: "error - networking error"})
end
Expand All @@ -72,20 +53,6 @@ defmodule ArchethicWeb.API.TransactionController do
end
end

defp summary_conflict_resolver() do
fn results ->
# Prioritize transactions results over not found
case Enum.filter(results, &match?(%TransactionSummary{}, &1)) do
[] ->
%NotFound{}

res ->
Enum.sort_by(res, & &1.timestamp, {:desc, DateTime})
|> List.first()
end
end
end

defp send_transaction(conn, tx = %Transaction{}) do
case Archethic.send_new_transaction(tx) do
:ok ->
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic_web/faucet_rate_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ defmodule ArchethicWeb.FaucetRateLimiter do
@impl GenServer
def handle_call({:block_status, address}, _from, state) do
address =
case TransactionChain.fetch_genesis_address_remotely(address) do
case Archethic.fetch_genesis_address_remotely(address) do
{:ok, genesis_address} ->
genesis_address

Expand Down
Loading