Skip to content

Commit

Permalink
stop the listener when closing the socket
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Nov 8, 2024
1 parent bb3794e commit e8c7ee6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
2 changes: 1 addition & 1 deletion lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ defmodule Archethic.P2P.Client.Connection do
:disconnected,
data = %{node_public_key: node_public_key, messages: messages}
) do
Logger.warning("Connection closed", node: Base.encode16(node_public_key))
Logger.warning("Outgoing connection closed", node: Base.encode16(node_public_key))

set_node_disconnected(node_public_key)

Expand Down
31 changes: 25 additions & 6 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info({ref, :stop}, state = %{transport: transport, socket: socket, ip: ip})
when is_reference(ref) do
if node_ip?(ip), do: Logger.error("Stopping listener (ip: #{:inet.ntoa(ip)})")
transport.close(socket)
{:stop, :normal, state}
end

def handle_info({ref, :ok}, state) when is_reference(ref) do
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, _pid, reason}, state = %{ip: ip}) do
if reason != :normal && node_ip?(ip),
do: Logger.error("handle_message crashed for reason: #{inspect(reason)}")

{:noreply, state}
end

def handle_info(
{_transport, socket, "hb"},
state = %{transport: transport}
Expand All @@ -59,8 +77,7 @@ defmodule Archethic.P2P.ListenerProtocol do
end

transport.close(socket)

{:noreply, state}
{:stop, :normal, state}
end

def handle_info(
Expand All @@ -69,15 +86,15 @@ defmodule Archethic.P2P.ListenerProtocol do
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
handle_message(msg, transport, socket, ip)
end)

{:noreply, state}
end

def handle_info({_transport_closed, _socket}, state = %{ip: ip, port: port}) do
Logger.warning("Connection closed for #{:inet.ntoa(ip)}:#{port}")
Logger.warning("Incoming connection closed #{:inet.ntoa(ip)}:#{port}")
{:stop, :normal, state}
end

Expand Down Expand Up @@ -106,22 +123,24 @@ defmodule Archethic.P2P.ListenerProtocol do
|> process_msg(sender_pkey)
|> encode_response(message_id, sender_pkey)
|> reply(transport, socket, message)

:ok
else
if node_ip?(ip) do
Logger.error("Received a message with an invalid signature",
node: Base.encode16(sender_pkey)
)
end

transport.close(socket)
:stop
end

{:error, reason} ->
if node_ip?(ip) do
Logger.error(reason)
end

transport.close(socket)
:stop
end
end

Expand Down

0 comments on commit e8c7ee6

Please sign in to comment.