Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskSupervisor partitionning #1589

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import Config
# Print only errors during test
config :logger, level: :error

config :archethic, Archethic.TaskSupervisor, enabled: true

config :archethic, :mut_dir, "data_test"

config :archethic, Archethic.BeaconChain.Subset, enabled: false
Expand Down
16 changes: 12 additions & 4 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Archethic do
alias Archethic.SelfRepair
alias Archethic.SelfRepair.NetworkChain
alias Archethic.SelfRepair.NetworkView
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
Expand All @@ -43,6 +43,14 @@ defmodule Archethic do
:persistent_term.get(:archethic_up, nil) == :up
end

@doc """
Return the via tuple to use in the Task.Supervisor module.
"""
@spec task_supervisors() :: tuple()
def task_supervisors() do
{:via, PartitionSupervisor, {Archethic.TaskSupervisors, self()}}
end

@doc """
Search a transaction by its address
Check locally and fallback to a quorum read
Expand Down Expand Up @@ -126,7 +134,7 @@ defmodule Archethic do
}

Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
validation_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -173,7 +181,7 @@ defmodule Archethic do
nodes
end

TaskSupervisor
Archethic.task_supervisors()
|> Task.Supervisor.start_child(fn ->
message = %NewTransaction{
transaction: tx,
Expand Down Expand Up @@ -265,7 +273,7 @@ defmodule Archethic do
defp get_welcome_node_public_key(_, key), do: key

defp notify_welcome_node(welcome_node_key, address, :already_locked) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(task_supervisors(), fn ->
message = %ValidationError{error: MiningError.new(:transaction_in_mining), address: address}
P2P.send_message(welcome_node_key, message)
end)
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Archethic.Application do
transport = Keyword.get(p2p_endpoint_conf, :transport, :tcp)

children = [
{Task.Supervisor, name: Archethic.TaskSupervisor},
{PartitionSupervisor, child_spec: Task.Supervisor, name: Archethic.TaskSupervisors},
Archethic.Telemetry,
{Registry, keys: :duplicate, name: Archethic.PubSubRegistry},
DBSupervisor,
Expand Down
6 changes: 2 additions & 4 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionSummaryList

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.DB
Expand Down Expand Up @@ -125,7 +123,7 @@ defmodule Archethic.BeaconChain do
@spec load_slot(Slot.t(), Crypto.key()) :: :ok | :error
def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}, node_public_key) do
if slot_time == SlotTimer.previous_slot(DateTime.utc_now()) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
case validate_slot(slot) do
:ok ->
Logger.debug("New beacon slot loaded - #{inspect(slot)}",
Expand Down Expand Up @@ -321,7 +319,7 @@ defmodule Archethic.BeaconChain do
# download the summaries
result =
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
summaries_by_node,
fn {node, addresses} ->
fetch_beacon_summaries(node, addresses)
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
alias Archethic.SelfRepair
alias Archethic.Utils

alias Archethic.TaskSupervisor

@doc """
Return the timeout to determine network patches
It is equivalent to 4m30s in production. 4.5s in dev.
Expand Down Expand Up @@ -226,7 +224,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

defp stream_network_stats(summary_time, beacon_nodes, timeout) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
Archethic.task_supervisors(),
beacon_nodes,
fn node ->
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, timeout)
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/slot/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ defmodule Archethic.BeaconChain.Slot.Validation do
alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

require Logger
Expand All @@ -21,7 +19,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do
@spec valid_transaction_attestations?(Slot.t()) :: boolean()
def valid_transaction_attestations?(%Slot{transaction_attestations: transaction_attestations}) do
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
transaction_attestations,
&valid_transaction_attestation/1,
ordered: false,
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Archethic.BeaconChain.Subset do
alias Archethic.P2P.Message.ReplicationAttestationMessage

alias Archethic.PubSub
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Utils
Expand Down Expand Up @@ -387,7 +387,7 @@ defmodule Archethic.BeaconChain.Subset do
network_patches_timeout = NetworkCoordinates.timeout()

patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
get_network_patches(time, subset, network_patches_timeout)
end)

Expand Down
7 changes: 4 additions & 3 deletions lib/archethic/beacon_chain/subset/p2p_sampling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

@type p2p_view :: {available? :: boolean(), latency :: non_neg_integer()}

@doc """
Expand All @@ -29,7 +27,10 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
def get_p2p_views(nodes, nodes_availability_times) when is_list(nodes) do
timeout = 1_000

Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view(&1, timeout),
Task.Supervisor.async_stream_nolink(
Archethic.task_supervisors(),
nodes,
&do_sample_p2p_view(&1, timeout),
on_timeout: :kill_task
)
|> Enum.with_index()
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ defmodule Archethic.BeaconChain.Update do
alias Archethic.P2P.Node
alias Archethic.P2P.Message.RegisterBeaconUpdates

alias Archethic.TaskSupervisor

def start_link(args \\ [], opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end
Expand Down Expand Up @@ -52,7 +50,7 @@ defmodule Archethic.BeaconChain.Update do
state
else
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
nodes_to_subscribe,
fn node ->
{P2P.send_message(node, message), node.first_public_key}
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/contracts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ defmodule Archethic.Contracts do
}

task =
Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
try do
# TODO: logs
logs = []
Expand Down Expand Up @@ -290,7 +290,7 @@ defmodule Archethic.Contracts do

queued_calls =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
genesis_addresses,
fn genesis_address ->
genesis_address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ defmodule Archethic.Contracts.Interpreter.Library.Common.HttpImpl do
alias Archethic.Tag
alias Archethic.Contracts.Interpreter.Library
alias Archethic.Contracts.Interpreter.Library.Common.Http
alias Archethic.TaskSupervisor

use Tag

Expand Down Expand Up @@ -129,7 +128,7 @@ defmodule Archethic.Contracts.Interpreter.Library.Common.HttpImpl do
"body" => request_body
}
) do
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
with :ok <- validate_request(url, method, headers, request_body),
headers <- Map.to_list(headers),
{:ok, uri} <- URI.new(url),
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/contracts/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ defmodule Archethic.Contracts.Loader do
defp resolve_genesis_address(recipients, authorized_nodes, protocol_version)
when protocol_version <= 7 do
Task.Supervisor.async_stream(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
recipients,
fn address ->
nodes = Election.chain_storage_nodes(address, authorized_nodes)
Expand Down
3 changes: 1 addition & 2 deletions lib/archethic/governance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Archethic.Governance do
alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionData.Recipient

alias Archethic.TaskSupervisor
alias Archethic.Utils

@proposal_tx_select_fields [
Expand Down Expand Up @@ -118,7 +117,7 @@ defmodule Archethic.Governance do
with true <- Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()),
{:ok, prop} <- get_code_proposal(prop_address),
true <- Code.enough_code_approval?(prop) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
if Code.valid_integration?(prop) do
Code.deploy_proposal_testnet(prop)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Archethic.Mining do

aggregated_responses =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
storage_nodes,
&P2P.send_message(&1, message),
max_concurrency: nb_storage_nodes,
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

results =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
storage_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -1223,7 +1223,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
# Notify error to the welcome node
message = %ValidationError{error: error, address: tx_address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(welcome_node, message)
:ok
end)
Expand All @@ -1235,7 +1235,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
end

defp time_offset(ref_time, timeout) do
# If time offset is negative (timeout is already passed based on ref_time)
# If time offset is negative (timeout is already passed based on ref_time)
# this function returns 0. GenStateMachine works with timeout = 0 and directly create
# an event in the Process message box after all external message already in the box
ref_time
Expand Down
7 changes: 3 additions & 4 deletions lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ defmodule Archethic.Mining.PendingTransactionValidation do
alias Archethic.SharedSecrets
alias Archethic.SharedSecrets.NodeRenewal

alias Archethic.TaskSupervisor
alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
Expand Down Expand Up @@ -829,13 +828,13 @@ defmodule Archethic.Mining.PendingTransactionValidation do

# fetch in parallel the data we need
tasks = [
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
fetch_previous_tx_genesis_address(tx)
end),
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
TransactionChain.fetch_genesis_address(token_address, storage_nodes)
end),
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
TransactionChain.fetch_transaction(token_address, storage_nodes)
end)
]
Expand Down
3 changes: 1 addition & 2 deletions lib/archethic/mining/smart_contract_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ defmodule Archethic.Mining.SmartContractValidation do

alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionData.Recipient
alias Archethic.TaskSupervisor

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.DateChecker, as: CronDateChecker
Expand Down Expand Up @@ -54,7 +53,7 @@ defmodule Archethic.Mining.SmartContractValidation do
default_error =
Error.new(:invalid_recipients_execution, "Failed to validate call due to timeout")

TaskSupervisor
Archethic.task_supervisors()
|> Task.Supervisor.async_stream_nolink(
recipients,
&request_contract_validation(&1, transaction, validation_time),
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do

results =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
replication_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -246,7 +246,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
# Notify error to the welcome node
message = %ValidationError{address: tx_address, error: error}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(
Crypto.last_node_public_key(),
message
Expand Down Expand Up @@ -323,7 +323,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
# Notify error to the welcome node
message = %ValidationError{error: Error.new(:timeout), address: tx.address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(welcome_node, message)
:ok
end)
Expand Down
10 changes: 4 additions & 6 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ defmodule Archethic.Mining.TransactionContext do
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
Expand Down Expand Up @@ -89,7 +87,7 @@ defmodule Archethic.Mining.TransactionContext do
previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes)

Task.Supervisor.async(
TaskSupervisor,
Archethic.task_supervisors(),
fn ->
# Timeout of 4 sec because the coordinator node wait 5 sec to get the context
# from the cross validation nodes
Expand All @@ -115,17 +113,17 @@ defmodule Archethic.Mining.TransactionContext do
|> Election.chain_storage_nodes(authorized_nodes)
|> Election.get_synchronized_nodes_before(previous_summary_time)

Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async(Archethic.task_supervisors(), fn ->
genesis_address
|> TransactionChain.fetch_unspent_outputs(genesis_nodes)
|> Enum.to_list()
end)
end

defp request_nodes_view(node_public_keys) do
Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async(Archethic.task_supervisors(), fn ->
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
Archethic.task_supervisors(),
node_public_keys,
fn node_public_key ->
{node_public_key, P2P.send_message(node_public_key, %Ping{}, 1000)}
Expand Down
Loading
Loading