From b876c91f779e5a1e8e903168967de9361a006a35 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Tue, 2 Apr 2024 12:07:27 +0200 Subject: [PATCH] Use tracing in mining --- lib/archethic.ex | 16 +- lib/archethic/mining.ex | 25 +- lib/archethic/mining/distributed_workflow.ex | 239 +++++++++++++----- lib/archethic/mining/standalone_workflow.ex | 108 ++++++-- .../replicate_pending_transaction_chain.ex | 69 +++-- .../replication_attestation_message.ex | 39 ++- lib/archethic/p2p/message/start_mining.ex | 6 +- .../p2p/message/validate_transaction.ex | 35 ++- lib/archethic/utils.ex | 15 ++ .../explorer/controllers/faucet_controller.ex | 10 + lib/archethic_web/transaction_subscriber.ex | 8 + 11 files changed, 429 insertions(+), 141 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 8769be9ce5..5f5cb68d28 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -33,6 +33,7 @@ defmodule Archethic do alias Archethic.TransactionChain.TransactionInput require Logger + require OpenTelemetry.Tracer @doc """ Returns true if a node is up and false if it is down @@ -123,10 +124,23 @@ defmodule Archethic do contract_context: contract_context } + mining_span = OpenTelemetry.Tracer.start_span("mining") + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Span.set_attribute(mining_span, "address", Base.encode16(tx.address)) + + OpenTelemetry.Span.set_attribute( + mining_span, + "node", + welcome_node_key |> P2P.get_node_info!() |> Node.endpoint() + ) + + trace = Archethic.Utils.inject_propagated_context() + :persistent_term.put({:initial_mining_span, tx.address}, mining_span) + Task.Supervisor.async_stream_nolink( Archethic.TaskSupervisor, validation_nodes, - &P2P.send_message(&1, message), + &P2P.send_message(&1, message, trace: trace), ordered: false, on_timeout: :kill_task, timeout: Message.get_timeout(message) + 2000 diff --git a/lib/archethic/mining.ex b/lib/archethic/mining.ex index 402e216d38..b2ae0cb07c 100644 --- a/lib/archethic/mining.ex +++ b/lib/archethic/mining.ex @@ -47,9 +47,24 @@ defmodule Archethic.Mining do transaction :: Transaction.t(), welcome_node_public_key :: Crypto.key(), validation_node_public_keys :: list(Crypto.key()), - contract_context :: nil | Contract.Context.t() + contract_context :: nil | Contract.Context.t(), + request_metadata :: %{} ) :: {:ok, pid()} - def start(tx = %Transaction{}, welcome_node_public_key, [_ | []], contract_context) do + def start( + tx, + welcome_node_public_key, + validation_node_keys, + contract_context, + request_metadata \\ %{} + ) + + def start( + tx = %Transaction{}, + welcome_node_public_key, + [_ | []], + contract_context, + _ + ) do StandaloneWorkflow.start_link( transaction: tx, welcome_node: P2P.get_node_info!(welcome_node_public_key), @@ -61,7 +76,8 @@ defmodule Archethic.Mining do tx = %Transaction{}, welcome_node_public_key, validation_node_public_keys, - contract_context + contract_context, + request_metadata ) when is_binary(welcome_node_public_key) and is_list(validation_node_public_keys) do DynamicSupervisor.start_child(WorkerSupervisor, { @@ -70,7 +86,8 @@ defmodule Archethic.Mining do welcome_node: P2P.get_node_info!(welcome_node_public_key), validation_nodes: Enum.map(validation_node_public_keys, &P2P.get_node_info!/1), node_public_key: Crypto.last_node_public_key(), - contract_context: contract_context + contract_context: contract_context, + request_metadata: request_metadata }) end diff --git a/lib/archethic/mining/distributed_workflow.ex b/lib/archethic/mining/distributed_workflow.ex index c78872612e..483ed9215f 100644 --- a/lib/archethic/mining/distributed_workflow.ex +++ b/lib/archethic/mining/distributed_workflow.ex @@ -13,6 +13,7 @@ defmodule Archethic.Mining.DistributedWorkflow do If the atomic commitment is not reached, it starts the malicious detection to ban the dishonest nodes """ + require OpenTelemetry.Tracer alias Archethic.BeaconChain alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.Crypto @@ -54,6 +55,7 @@ defmodule Archethic.Mining.DistributedWorkflow do alias Archethic.Utils require Logger + require OpenTelemetry.Tracer use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary @vsn 1 @@ -173,6 +175,9 @@ defmodule Archethic.Mining.DistributedWorkflow do node_public_key = Keyword.get(opts, :node_public_key) timeout = Keyword.get(opts, :timeout, get_mining_timeout(tx.type)) contract_context = Keyword.get(opts, :contract_context) + %{trace: trace} = Keyword.get(opts, :request_metadata, %{trace: ""}) + + Utils.extract_progagated_context(trace) Registry.register(WorkflowRegistry, tx.address, []) @@ -181,6 +186,15 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_type: tx.type ) + mining_span_context = OpenTelemetry.Tracer.start_span("start mining") + OpenTelemetry.Tracer.set_current_span(mining_span_context) + + OpenTelemetry.Span.set_attribute( + mining_span_context, + "node", + node_public_key |> P2P.get_node_info!() |> Node.endpoint() + ) + next_events = [ {:next_event, :internal, {:start_mining, tx, welcome_node, validation_nodes, contract_context}}, @@ -191,7 +205,8 @@ defmodule Archethic.Mining.DistributedWorkflow do %{ node_public_key: node_public_key, start_time: System.monotonic_time(), - timeout: timeout + timeout: timeout, + mining_span_context: mining_span_context }, next_events} end @@ -199,7 +214,7 @@ defmodule Archethic.Mining.DistributedWorkflow do :internal, {:start_mining, tx, welcome_node, validation_nodes, contract_context}, :idle, - data + data = %{mining_span_context: mining_span_context} ) do validation_time = DateTime.utc_now() |> DateTime.truncate(:millisecond) @@ -211,7 +226,9 @@ defmodule Archethic.Mining.DistributedWorkflow do previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes) {:ok, genesis_address} = - TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + OpenTelemetry.Tracer.with_span "fetch genesis address", %{links: [mining_span_context]} do + TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + end genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes) @@ -222,7 +239,10 @@ defmodule Archethic.Mining.DistributedWorkflow do authorized_nodes ) - resolved_addresses = TransactionChain.resolve_transaction_addresses!(tx) + resolved_addresses = + OpenTelemetry.Tracer.with_span "resolve addresses" do + TransactionChain.resolve_transaction_addresses!(tx) + end io_storage_nodes = if Transaction.network_type?(tx.type) do @@ -273,8 +293,13 @@ defmodule Archethic.Mining.DistributedWorkflow do ) do role = if node_public_key == coordinator_key, do: :coordinator, else: :cross_validator + pending_validation = + OpenTelemetry.Tracer.with_span "pending validation" do + PendingTransactionValidation.validate(tx, validation_time) + end + new_context = - case PendingTransactionValidation.validate(tx, validation_time) do + case pending_validation do :ok -> Logger.debug("Pending transaction valid", transaction_address: Base.encode16(tx.address), @@ -342,13 +367,15 @@ defmodule Archethic.Mining.DistributedWorkflow do {prev_tx, unspent_outputs, previous_storage_nodes, chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} = - TransactionContext.get( - Transaction.previous_address(tx), - genesis_address, - Enum.map(chain_storage_nodes, & &1.first_public_key), - Enum.map(beacon_storage_nodes, & &1.first_public_key), - Enum.map(io_storage_nodes, & &1.first_public_key) - ) + OpenTelemetry.Tracer.with_span "fetch context" do + TransactionContext.get( + Transaction.previous_address(tx), + genesis_address, + Enum.map(chain_storage_nodes, & &1.first_public_key), + Enum.map(beacon_storage_nodes, & &1.first_public_key), + Enum.map(io_storage_nodes, & &1.first_public_key) + ) + end now = System.monotonic_time() @@ -431,7 +458,10 @@ defmodule Archethic.Mining.DistributedWorkflow do node_public_key: node_public_key, context: context }) do - notify_transaction_context(context, node_public_key) + OpenTelemetry.Tracer.with_span "notify context" do + notify_transaction_context(context, node_public_key) + end + :keep_state_and_data end @@ -476,9 +506,12 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(mining_span_context) + Logger.info("Aggregate mining context", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -553,11 +586,26 @@ defmodule Archethic.Mining.DistributedWorkflow do _ -> new_context = context - |> ValidationContext.create_validation_stamp() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "create stamp" do + ValidationContext.create_validation_stamp(context) + end + end) |> ValidationContext.create_replication_tree() + request_cross_validation_context = + OpenTelemetry.Tracer.start_span("request cross validations") + + OpenTelemetry.Tracer.set_current_span(request_cross_validation_context) + request_cross_validations(new_context) - {:next_state, :wait_cross_validation_stamps, %{data | context: new_context}} + + new_data = + data + |> Map.put(:context, new_context) + |> Map.put(:request_cross_validation_context, request_cross_validation_context) + + {:next_state, :wait_cross_validation_stamps, new_data} end end @@ -571,9 +619,12 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(mining_span_context) + Logger.info("Cross validation", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -585,9 +636,15 @@ defmodule Archethic.Mining.DistributedWorkflow do |> ValidationContext.set_confirmed_validation_nodes(confirmed_cross_validation_nodes) |> ValidationContext.add_validation_stamp(validation_stamp) |> ValidationContext.add_replication_tree(replication_tree, node_public_key) - |> ValidationContext.cross_validate() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "cross validation" do + ValidationContext.cross_validate(context) + end + end) - notify_cross_validation_stamp(new_context) + OpenTelemetry.Tracer.with_span "notify cross validation stamp" do + notify_cross_validation_stamp(new_context) + end confirmed_cross_validation_nodes = ValidationContext.get_confirmed_validation_nodes(new_context) @@ -623,7 +680,9 @@ defmodule Archethic.Mining.DistributedWorkflow do {:add_cross_validation_stamp, cross_validation_stamp = %CrossValidationStamp{}}, :wait_cross_validation_stamps, data = %{ - context: context = %ValidationContext{transaction: tx} + context: context = %ValidationContext{transaction: tx}, + request_cross_validation_context: request_cross_validation_context, + mining_span_context: mining_span_context } ) do Logger.info("Add cross validation stamp", @@ -634,6 +693,9 @@ defmodule Archethic.Mining.DistributedWorkflow do new_context = ValidationContext.add_cross_validation_stamp(context, cross_validation_stamp) if ValidationContext.enough_cross_validation_stamps?(new_context) do + OpenTelemetry.Tracer.end_span(request_cross_validation_context) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + if ValidationContext.atomic_commitment?(new_context) do {:next_state, :replication, %{data | context: new_context}} else @@ -698,9 +760,21 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_type: type ) + request_replication_validation_span_context = + OpenTelemetry.Tracer.start_span("request replication validation") + + OpenTelemetry.Tracer.set_current_span(request_replication_validation_span_context) new_context = request_replication_validation(context, node_public_key) - {:keep_state, %{data | context: new_context}} + new_data = + data + |> Map.put(:context, new_context) + |> Map.put( + :request_replication_validation_span_context, + request_replication_validation_span_context + ) + + {:keep_state, new_data} err -> Logger.info("Skipped replication because validation failed: #{inspect(err)}") @@ -714,18 +788,39 @@ defmodule Archethic.Mining.DistributedWorkflow do :cast, {:add_replication_validation, node_public_key}, :replication, - data = %{context: context} + data = %{ + context: context, + request_replication_validation_span_context: + request_replication_validation_span_context, + mining_span_context: mining_span_context + } ) do validation_nodes = ValidationContext.get_validation_nodes(context) if Utils.key_in_node_list?(validation_nodes, node_public_key) do new_context = ValidationContext.add_replication_validation(context, node_public_key) - if ValidationContext.enough_replication_validations?(new_context) do - request_replication(new_context) - end + request_replication_span_context = + if ValidationContext.enough_replication_validations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_validation_span_context) - {:keep_state, %{data | context: new_context}} + OpenTelemetry.Tracer.set_current_span(mining_span_context) + + request_replication_span_context = + OpenTelemetry.Tracer.start_span("request_replication") + + OpenTelemetry.Tracer.set_current_span(request_replication_span_context) + + request_replication(new_context) + request_replication_span_context + end + + new_data = + data + |> Map.put(:context, new_context) + |> Map.put(:request_replication_span_context, request_replication_span_context) + + {:keep_state, new_data} else :keep_state_and_data end @@ -742,9 +837,13 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction: %Transaction{address: address, type: type}, validation_time: validation_time, genesis_address: genesis_address - } + }, + request_replication_span_context: request_replication_span_context, + mining_span_context: mining_span_context } ) do + OpenTelemetry.Tracer.set_current_span(request_replication_span_context) + with {:ok, node_index} <- ValidationContext.get_chain_storage_position(context, node_public_key), validated_tx <- ValidationContext.get_validated_transaction(context), @@ -760,6 +859,9 @@ defmodule Archethic.Mining.DistributedWorkflow do new_context = ValidationContext.add_storage_confirmation(context, node_index, signature) if ValidationContext.enough_storage_confirmations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_span_context) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + duration = System.monotonic_time() - start_time # send the mining_completed event @@ -806,26 +908,33 @@ defmodule Archethic.Mining.DistributedWorkflow do validated_tx = ValidationContext.get_validated_transaction(context) tx_summary = TransactionSummary.from_transaction(validated_tx, genesis_address) - message = - ReplicationAttestationMessage.from_replication_attestation(%ReplicationAttestation{ - transaction_summary: tx_summary, - confirmations: confirmations - }) + OpenTelemetry.Tracer.with_span "notify attestation" do + trace = Archethic.Utils.inject_propagated_context() - beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context) + message = + ReplicationAttestationMessage.from_replication_attestation(%ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: confirmations + }) - [welcome_node | beacon_storage_nodes] - |> P2P.distinct_nodes() - |> P2P.broadcast_message(message) + beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context) - validated_tx = ValidationContext.get_validated_transaction(context) + [welcome_node | beacon_storage_nodes] + |> P2P.distinct_nodes() + |> P2P.broadcast_message(message, trace: trace) - context - |> ValidationContext.get_io_replication_nodes() - |> P2P.broadcast_message(%ReplicateTransaction{ - transaction: validated_tx, - genesis_address: genesis_address - }) + validated_tx = ValidationContext.get_validated_transaction(context) + + context + |> ValidationContext.get_io_replication_nodes() + |> P2P.broadcast_message( + %ReplicateTransaction{ + transaction: validated_tx, + genesis_address: genesis_address + }, + trace: trace + ) + end :keep_state_and_data end @@ -838,15 +947,21 @@ defmodule Archethic.Mining.DistributedWorkflow do context: context = %ValidationContext{ transaction: tx - } + }, + mining_span_context: mining_span_context } ) do unless Transaction.network_type?(tx.type) do - context - |> ValidationContext.get_confirmed_replication_nodes() - |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + OpenTelemetry.Tracer.with_span "notify previous chain" do + context + |> ValidationContext.get_confirmed_replication_nodes() + |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + end end + OpenTelemetry.Tracer.set_current_span(mining_span_context) + OpenTelemetry.Tracer.end_span(mining_span_context) + :stop end @@ -1031,15 +1146,19 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_address: Base.encode16(tx_address) ) - P2P.send_message(coordinator_node, %AddMiningContext{ - address: tx_address, - utxos_hashes: Enum.map(unspent_outputs, &VersionedUnspentOutput.hash/1), - validation_node_public_key: node_public_key, - previous_storage_nodes_public_keys: Enum.map(previous_storage_nodes, & &1.last_public_key), - chain_storage_nodes_view: chain_storage_nodes_view, - beacon_storage_nodes_view: beacon_storage_nodes_view, - io_storage_nodes_view: io_storage_nodes_view - }) + P2P.send_message( + coordinator_node, + %AddMiningContext{ + address: tx_address, + utxos_hashes: Enum.map(unspent_outputs, &VersionedUnspentOutput.hash/1), + validation_node_public_key: node_public_key, + previous_storage_nodes_public_keys: + Enum.map(previous_storage_nodes, & &1.last_public_key), + chain_storage_nodes_view: chain_storage_nodes_view, + beacon_storage_nodes_view: beacon_storage_nodes_view, + io_storage_nodes_view: io_storage_nodes_view + } + ) end defp request_cross_validations( @@ -1122,11 +1241,13 @@ defmodule Archethic.Mining.DistributedWorkflow do inputs: aggregated_utxos } + trace = Archethic.Utils.inject_propagated_context() + results = Task.Supervisor.async_stream_nolink( Archethic.TaskSupervisor, storage_nodes, - &P2P.send_message(&1, message), + &P2P.send_message(&1, message, trace: trace), ordered: false, on_timeout: :kill_task, timeout: Message.get_timeout(message) + 2000, @@ -1190,7 +1311,9 @@ defmodule Archethic.Mining.DistributedWorkflow do genesis_address: genesis_address } - P2P.broadcast_message(storage_nodes, message) + P2P.broadcast_message(storage_nodes, message, + trace: Archethic.Utils.inject_propagated_context() + ) end defp notify_error(reason, %{ diff --git a/lib/archethic/mining/standalone_workflow.ex b/lib/archethic/mining/standalone_workflow.ex index d9414c7503..d971b82f5f 100644 --- a/lib/archethic/mining/standalone_workflow.ex +++ b/lib/archethic/mining/standalone_workflow.ex @@ -38,6 +38,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do alias Archethic.TransactionChain.TransactionSummary require Logger + require OpenTelemetry.Tracer @mining_timeout Application.compile_env!(:archethic, [__MODULE__, :global_timeout]) @@ -65,6 +66,10 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction_type: tx.type ) + mining_span_context = OpenTelemetry.Tracer.start_span(:mining) + OpenTelemetry.Span.set_attribute(mining_span_context, "address", Base.encode16(tx.address)) + OpenTelemetry.Tracer.set_current_span(mining_span_context) + validation_time = DateTime.utc_now() |> DateTime.truncate(:millisecond) current_node = P2P.get_node_info() @@ -79,13 +84,18 @@ defmodule Archethic.Mining.StandaloneWorkflow do authorized_nodes ) - resolved_addresses = TransactionChain.resolve_transaction_addresses!(tx) + resolved_addresses = + OpenTelemetry.Tracer.with_span :resolve_address, %{links: [mining_span_context]} do + TransactionChain.resolve_transaction_addresses!(tx) + end previous_address = Transaction.previous_address(tx) previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes) {:ok, genesis_address} = - TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + OpenTelemetry.Tracer.with_span "fetch genesis address" do + TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes) + end genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes) @@ -103,13 +113,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do {prev_tx, unspent_outputs, previous_storage_nodes, chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} = - TransactionContext.get( - Transaction.previous_address(tx), - genesis_address, - Enum.map(chain_storage_nodes, & &1.first_public_key), - Enum.map(beacon_storage_nodes, & &1.first_public_key), - Enum.map(io_storage_nodes, & &1.first_public_key) - ) + OpenTelemetry.Tracer.with_span "fetch context" do + TransactionContext.get( + Transaction.previous_address(tx), + genesis_address, + Enum.map(chain_storage_nodes, & &1.first_public_key), + Enum.map(beacon_storage_nodes, & &1.first_public_key), + Enum.map(io_storage_nodes, & &1.first_public_key) + ) + end :telemetry.execute([:archethic, :mining, :fetch_context], %{ duration: System.monotonic_time() - start @@ -130,8 +142,13 @@ defmodule Archethic.Mining.StandaloneWorkflow do genesis_address: genesis_address ) + pending_validation = + OpenTelemetry.Tracer.with_span "pending validation" do + PendingTransactionValidation.validate(tx, validation_time) + end + validation_context = - case PendingTransactionValidation.validate(tx, validation_time) do + case pending_validation do :ok -> ValidationContext.set_pending_transaction_validation(validation_context, true) @@ -152,12 +169,14 @@ defmodule Archethic.Mining.StandaloneWorkflow do |> ValidationContext.add_aggregated_utxos(unspent_outputs) |> validate() - start_replication(validation_context) + request_replication_span = start_replication(validation_context) new_state = state |> Map.put(:context, validation_context) |> Map.put(:confirmations, []) + |> Map.put(:mining_span_context, mining_span_context) + |> Map.put(:request_replication_span, request_replication_span) {:noreply, new_state, @mining_timeout} end @@ -165,9 +184,17 @@ defmodule Archethic.Mining.StandaloneWorkflow do defp validate(context = %ValidationContext{}) do context |> ValidationContext.confirm_validation_node(Crypto.last_node_public_key()) - |> ValidationContext.create_validation_stamp() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "create stamp" do + ValidationContext.create_validation_stamp(context) + end + end) |> ValidationContext.create_replication_tree() - |> ValidationContext.cross_validate() + |> then(fn context -> + OpenTelemetry.Tracer.with_span "cross validate" do + ValidationContext.cross_validate(context) + end + end) end defp start_replication( @@ -194,17 +221,19 @@ defmodule Archethic.Mining.StandaloneWorkflow do } results = - Task.Supervisor.async_stream_nolink( - Archethic.TaskSupervisor, - replication_nodes, - &P2P.send_message(&1, message), - ordered: false, - on_timeout: :kill_task, - timeout: Message.get_timeout(message) + 2000 - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.map(fn {:ok, {:ok, res}} -> res end) - |> Enum.to_list() + OpenTelemetry.Tracer.with_span "request validation" do + Task.Supervisor.async_stream_nolink( + Archethic.TaskSupervisor, + replication_nodes, + &P2P.send_message(&1, message), + ordered: false, + on_timeout: :kill_task, + timeout: Message.get_timeout(message) + 2000 + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.map(fn {:ok, {:ok, res}} -> res end) + |> Enum.to_list() + end if Enum.all?(results, &match?(%Ok{}, &1)) do Logger.info( @@ -213,10 +242,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction_type: validated_tx.type ) + replication_request_context = OpenTelemetry.Tracer.start_span("request_replication") + OpenTelemetry.Tracer.set_current_span(replication_request_context) + P2P.broadcast_message(replication_nodes, %ReplicatePendingTransactionChain{ address: validated_tx.address, genesis_address: genesis_address }) + + replication_request_context else errors = Enum.filter(results, &match?(%ReplicationError{}, &1)) @@ -227,6 +261,8 @@ defmodule Archethic.Mining.StandaloneWorkflow do _ -> send(self(), {:replication_error, :invalid_atomic_commitment}) end + + nil end end @@ -293,7 +329,9 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction: %Transaction{address: address, type: type}, validation_time: validation_time, genesis_address: genesis_address - } + }, + mining_span_context: mining_span_context, + request_replication_span: request_replication_span } ) do with {:ok, node_index} <- @@ -307,6 +345,10 @@ defmodule Archethic.Mining.StandaloneWorkflow do new_state = %{state | context: new_context} if ValidationContext.enough_storage_confirmations?(new_context) do + OpenTelemetry.Tracer.end_span(request_replication_span) + + OpenTelemetry.Tracer.set_current_span(mining_span_context) + duration = System.monotonic_time() - start_time # send the mining_completed event @@ -318,6 +360,8 @@ defmodule Archethic.Mining.StandaloneWorkflow do }) notify(new_state) + + OpenTelemetry.Tracer.end_span(mining_span_context) {:stop, :normal, new_state} else {:noreply, new_state} @@ -359,9 +403,17 @@ defmodule Archethic.Mining.StandaloneWorkflow do end defp notify(%{context: context}) do - notify_attestation(context) - notify_io_nodes(context) - notify_previous_chain(context) + OpenTelemetry.Tracer.with_span :notify_attestation do + notify_attestation(context) + end + + OpenTelemetry.Tracer.with_span "notify I/O" do + notify_io_nodes(context) + end + + OpenTelemetry.Tracer.with_span "notify previous chain" do + notify_previous_chain(context) + end :ok end diff --git a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex index 3bf00766e9..969849c717 100644 --- a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex +++ b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex @@ -17,40 +17,59 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do alias Archethic.TransactionChain.VersionedTransactionInput alias Archethic.TransactionChain.TransactionSummary alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.P2P.Message.AcknowledgeStorage + require OpenTelemetry.Tracer + @type t() :: %__MODULE__{ address: binary(), genesis_address: binary() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() - def process(%__MODULE__{address: address, genesis_address: genesis_address}, sender_public_key) do - case Replication.get_transaction_in_commit_pool(address) do - {:ok, - tx = %Transaction{ - address: tx_address, - validation_stamp: %ValidationStamp{timestamp: validation_time} - }, validation_inputs} -> - Task.Supervisor.start_child(TaskSupervisor, fn -> - authorized_nodes = P2P.authorized_and_available_nodes(validation_time) - - Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes) - - TransactionChain.write_inputs( - tx_address, - convert_unspent_outputs_to_inputs(validation_inputs) - ) - - P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address)) - end) - - %Ok{} - - {:error, :transaction_not_exists} -> - %Error{reason: :invalid_transaction} + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() + def process( + %__MODULE__{address: address, genesis_address: genesis_address}, + %{ + sender_public_key: sender_public_key, + trace: trace + } + ) do + Utils.extract_progagated_context(trace) + + OpenTelemetry.Tracer.with_span "replicate transaction" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) + + case Replication.get_transaction_in_commit_pool(address) do + {:ok, + tx = %Transaction{ + address: tx_address, + validation_stamp: %ValidationStamp{timestamp: validation_time} + }, validation_inputs} -> + Task.Supervisor.start_child(TaskSupervisor, fn -> + authorized_nodes = P2P.authorized_and_available_nodes(validation_time) + + Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes) + + TransactionChain.write_inputs( + tx_address, + convert_unspent_outputs_to_inputs(validation_inputs) + ) + + P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address)) + end) + + %Ok{} + + {:error, :transaction_not_exists} -> + %Error{reason: :invalid_transaction} + end end end diff --git a/lib/archethic/p2p/message/replication_attestation_message.ex b/lib/archethic/p2p/message/replication_attestation_message.ex index 6a5d6e4c6d..ade835df07 100644 --- a/lib/archethic/p2p/message/replication_attestation_message.ex +++ b/lib/archethic/p2p/message/replication_attestation_message.ex @@ -5,13 +5,17 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do """ alias Archethic.BeaconChain.ReplicationAttestation - alias Archethic.Crypto alias Archethic.PubSub + alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Error alias Archethic.TransactionChain.TransactionSummary + alias Archethic.Utils require Logger + require OpenTelemetry.Tracer defstruct replication_attestation: %ReplicationAttestation{} @@ -19,7 +23,7 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do replication_attestation: ReplicationAttestation.t() } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | Error.t() def process( %__MODULE__{ replication_attestation: @@ -30,20 +34,29 @@ defmodule Archethic.P2P.Message.ReplicationAttestationMessage do } } }, - _ + %{trace: trace} ) do - case ReplicationAttestation.validate(attestation) do - :ok -> - PubSub.notify_replication_attestation(attestation) - %Ok{} + Utils.extract_progagated_context(trace) - {:error, :invalid_confirmations_signatures} -> - Logger.error("Invalid attestation signatures", - transaction_address: Base.encode16(tx_address), - transaction_type: tx_type - ) + OpenTelemetry.Tracer.with_span "replicate attestation" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) - %Error{reason: :invalid_attestation} + case ReplicationAttestation.validate(attestation) do + :ok -> + PubSub.notify_replication_attestation(attestation) + %Ok{} + + {:error, :invalid_confirmations_signatures} -> + Logger.error("Invalid attestation signatures", + transaction_address: Base.encode16(tx_address), + transaction_type: tx_type + ) + + %Error{reason: :invalid_attestation} + end end end diff --git a/lib/archethic/p2p/message/start_mining.ex b/lib/archethic/p2p/message/start_mining.ex index e162401479..265da607ce 100644 --- a/lib/archethic/p2p/message/start_mining.ex +++ b/lib/archethic/p2p/message/start_mining.ex @@ -52,14 +52,16 @@ defmodule Archethic.P2P.Message.StartMining do p2p_view_hash: p2p_view_hash, contract_context: contract_context }, - _ + metadata ) do with :ok <- check_synchronization(network_chains_view_hash, p2p_view_hash), :ok <- check_valid_election(tx, validation_nodes), :ok <- check_current_node_is_elected(validation_nodes), :ok <- check_not_already_mining(tx.address), :ok <- Mining.request_chain_lock(tx) do - {:ok, _} = Mining.start(tx, welcome_node_public_key, validation_nodes, contract_context) + {:ok, _} = + Mining.start(tx, welcome_node_public_key, validation_nodes, contract_context, metadata) + %Ok{} else {:error, :invalid_validation_nodes_election} -> diff --git a/lib/archethic/p2p/message/validate_transaction.ex b/lib/archethic/p2p/message/validate_transaction.ex index 789887b7ef..09dbba60c1 100644 --- a/lib/archethic/p2p/message/validate_transaction.ex +++ b/lib/archethic/p2p/message/validate_transaction.ex @@ -8,28 +8,43 @@ defmodule Archethic.P2P.Message.ValidateTransaction do alias Archethic.TransactionChain.Transaction alias Archethic.P2P.Message.ReplicationError alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.Replication - alias Archethic.Crypto alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput alias Archethic.Utils + require OpenTelemetry.Tracer + @type t :: %__MODULE__{ transaction: Transaction.t(), contract_context: nil | Contract.Context.t(), inputs: list(VersionedUnspentOutput.t()) } - @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | ReplicationError.t() - def process(%__MODULE__{transaction: tx, contract_context: contract_context, inputs: inputs}, _) do - case Replication.validate_transaction(tx, contract_context, inputs) do - :ok -> - Replication.add_transaction_to_commit_pool(tx, inputs) - %Ok{} - - {:error, reason} -> - %ReplicationError{address: tx.address, reason: reason} + @spec process(__MODULE__.t(), Message.metadata()) :: Ok.t() | ReplicationError.t() + def process(%__MODULE__{transaction: tx, contract_context: contract_context, inputs: inputs}, %{ + trace: trace + }) do + Utils.extract_progagated_context(trace) + + OpenTelemetry.Tracer.with_span "validate transaction (storage)" do + OpenTelemetry.Tracer.set_attribute( + "node", + P2P.get_node_info() |> Node.endpoint() + ) + + case Replication.validate_transaction(tx, contract_context, inputs) do + :ok -> + Replication.add_transaction_to_commit_pool(tx, inputs) + %Ok{} + + {:error, reason} -> + %ReplicationError{address: tx.address, reason: reason} + end end end diff --git a/lib/archethic/utils.ex b/lib/archethic/utils.ex index d6678242f1..f4fcfd4999 100644 --- a/lib/archethic/utils.ex +++ b/lib/archethic/utils.ex @@ -1287,4 +1287,19 @@ defmodule Archethic.Utils do {Enum.reverse(items), more?, offset} end + + @spec extract_progagated_context(binary()) :: :ok + def extract_progagated_context(""), do: :ok + + def extract_progagated_context(trace) do + :otel_propagator_text_map.extract([{"traceparent", trace}]) + :otel_ctx.attach(:otel_ctx.get_current()) + end + + @spec inject_propagated_context() :: binary() + def inject_propagated_context do + :otel_propagator_text_map.inject([]) + |> Map.new() + |> Map.get("traceparent") + end end diff --git a/lib/archethic_web/explorer/controllers/faucet_controller.ex b/lib/archethic_web/explorer/controllers/faucet_controller.ex index 9151c61ec2..94e00eb28c 100644 --- a/lib/archethic_web/explorer/controllers/faucet_controller.ex +++ b/lib/archethic_web/explorer/controllers/faucet_controller.ex @@ -13,6 +13,8 @@ defmodule ArchethicWeb.Explorer.FaucetController do alias ArchethicWeb.TransactionSubscriber alias ArchethicWeb.Explorer.FaucetRateLimiter + require OpenTelemetry.Tracer + @pool_seed Application.compile_env(:archethic, [__MODULE__, :seed]) @faucet_rate_limit_expiry Application.compile_env(:archethic, :faucet_rate_limit_expiry) @@ -121,6 +123,14 @@ defmodule ArchethicWeb.Explorer.FaucetController do receive do {:new_transaction, ^tx_address} -> + try do + mining_span = :persistent_term.get({:initial_mining_span, tx_address}) + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Tracer.end_span(mining_span) + rescue + _ -> :ok + end + FaucetRateLimiter.register(recipient_address, System.monotonic_time()) {:ok, tx_address} after diff --git a/lib/archethic_web/transaction_subscriber.ex b/lib/archethic_web/transaction_subscriber.ex index 4433317a2b..5ed60a7f61 100644 --- a/lib/archethic_web/transaction_subscriber.ex +++ b/lib/archethic_web/transaction_subscriber.ex @@ -102,6 +102,14 @@ defmodule ArchethicWeb.TransactionSubscriber do transaction_confirmed: tx_address ) + try do + mining_span = :persistent_term.get({:initial_mining_span, tx_address}) + OpenTelemetry.Tracer.set_current_span(mining_span) + OpenTelemetry.Tracer.end_span(mining_span) + rescue + _ -> :ok + end + send(from, {:new_transaction, tx_address}) case Map.get(state, tx_address) do