diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 35f1f56df..7ad928c24 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -8,12 +8,10 @@ defmodule Archethic.P2P.ListenerProtocol do require Logger alias Archethic.Crypto - + alias Archethic.P2P alias Archethic.P2P.Message alias Archethic.P2P.MessageEnvelop - alias Archethic.TaskSupervisor - alias Archethic.Utils @behaviour :ranch_protocol @@ -37,81 +35,27 @@ defmodule Archethic.P2P.ListenerProtocol do }) end + def handle_info( + {_transport, socket, err}, + state = %{transport: transport, ip: ip} + ) + when is_atom(err) do + if node_ip?(ip) do + Logger.error("Received an error from tcp listener (ip: #{:inet.ntoa(ip)}): #{inspect(err)}") + end + + transport.close(socket) + {:noreply, state} + end + def handle_info( {_transport, socket, msg}, - state = %{transport: transport} + state = %{transport: transport, ip: ip} ) do :inet.setopts(socket, active: :once) Task.Supervisor.start_child(TaskSupervisor, fn -> - start_decode_time = System.monotonic_time() - - %MessageEnvelop{ - message_id: message_id, - message: message, - sender_public_key: sender_public_key, - signature: signature - } = MessageEnvelop.decode(msg) - - valid_signature? = - Crypto.verify?( - signature, - Message.encode(message) |> Utils.wrap_binary(), - sender_public_key - ) - - if valid_signature? do - :telemetry.execute( - [:archethic, :p2p, :decode_message], - %{duration: System.monotonic_time() - start_decode_time}, - %{message: Message.name(message)} - ) - - start_processing_time = System.monotonic_time() - response = Message.process(message, sender_public_key) - - :telemetry.execute( - [:archethic, :p2p, :handle_message], - %{ - duration: System.monotonic_time() - start_processing_time - }, - %{message: Message.name(message)} - ) - - start_encode_time = System.monotonic_time() - - response_signature = - response - |> Message.encode() - |> Utils.wrap_binary() - |> Crypto.sign_with_first_node_key() - - encoded_response = - %MessageEnvelop{ - message: response, - message_id: message_id, - sender_public_key: Crypto.first_node_public_key(), - signature: response_signature - } - |> MessageEnvelop.encode(sender_public_key) - - :telemetry.execute( - [:archethic, :p2p, :encode_message], - %{duration: System.monotonic_time() - start_encode_time}, - %{message: Archethic.P2P.Message.name(message)} - ) - - start_sending_time = System.monotonic_time() - transport.send(socket, encoded_response) - - :telemetry.execute( - [:archethic, :p2p, :transport_sending_message], - %{duration: System.monotonic_time() - start_sending_time}, - %{message: Archethic.P2P.Message.name(message)} - ) - else - transport.close(socket) - end + handle_message(msg, transport, socket, ip) end) {:noreply, state} @@ -121,4 +65,121 @@ defmodule Archethic.P2P.ListenerProtocol do Logger.warning("Connection closed for #{:inet.ntoa(ip)}:#{port}") {:stop, :normal, state} end + + defp handle_message(msg, transport, socket, ip) do + case decode_msg(msg) do + {:ok, + %MessageEnvelop{ + message_id: message_id, + message: message, + sender_public_key: sender_pkey, + signature: signature + }} -> + valid_signature? = + Crypto.verify?( + signature, + message |> Message.encode() |> Utils.wrap_binary(), + sender_pkey + ) + + if valid_signature? do + message + |> process_msg(sender_pkey) + |> encode_response(message_id, sender_pkey) + |> reply(transport, socket, message) + else + if node_ip?(ip) do + Logger.error("Received a message with an invalid signature", + node: Base.encode16(sender_pkey) + ) + end + + transport.close(socket) + end + + {:error, reason} -> + if node_ip?(ip) do + Logger.error(reason) + end + + transport.close(socket) + end + end + + # msg is the bytes coming from TCP + # message is the struct + defp decode_msg(msg) do + start_decode_time = System.monotonic_time() + + MessageEnvelop.decode(msg) + |> then(fn res = %MessageEnvelop{message: message} -> + :telemetry.execute( + [:archethic, :p2p, :decode_message], + %{duration: System.monotonic_time() - start_decode_time}, + %{message: Message.name(message)} + ) + + {:ok, res} + end) + rescue + err -> + {:error, Exception.format(:error, err, __STACKTRACE__)} + end + + defp process_msg(message, sender_pkey) do + start_processing_time = System.monotonic_time() + + Message.process(message, sender_pkey) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :handle_message], + %{ + duration: System.monotonic_time() - start_processing_time + }, + %{message: Message.name(message)} + ) + end) + end + + defp encode_response(response, message_id, sender_pkey) do + start_encode_time = System.monotonic_time() + + response_signature = + response + |> Message.encode() + |> Utils.wrap_binary() + |> Crypto.sign_with_first_node_key() + + %MessageEnvelop{ + message: response, + message_id: message_id, + sender_public_key: Crypto.first_node_public_key(), + signature: response_signature + } + |> MessageEnvelop.encode(sender_pkey) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :encode_message], + %{duration: System.monotonic_time() - start_encode_time}, + %{message: Message.name(response)} + ) + end) + end + + defp reply(encoded_response, transport, socket, message) do + start_sending_time = System.monotonic_time() + + transport.send(socket, encoded_response) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :transport_sending_message], + %{duration: System.monotonic_time() - start_sending_time}, + %{message: Message.name(message)} + ) + end) + end + + defp node_ip?(ip) do + P2P.list_nodes() |> Enum.map(& &1.ip) |> Enum.member?(ip) + end end