diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 4175a5da1..7e81d7bb9 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -2,11 +2,11 @@ defmodule Archethic.P2P.Client.Connection do @moduledoc """ 3 states: - :initializing + :connecting {:connected, socket} :disconnected - we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt + we use the :connecting state to be able to postpone messages """ alias Archethic.Crypto @@ -21,7 +21,7 @@ defmodule Archethic.P2P.Client.Connection do require Logger use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary - @vsn 2 + @vsn 3 @table_name :connection_status @max_reconnect_delay :timer.hours(6) @@ -131,19 +131,20 @@ defmodule Archethic.P2P.Client.Connection do availability_timer: {nil, 0}, reconnect_attempts: 0, heartbeats_sent: 0, - heartbeats_received: 0 + heartbeats_received: 0, + heartbeats_timer: nil } - {:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]} + {:ok, :disconnected, data, [{:next_event, :internal, {:connect, from}}]} end # every messages sent while inializing will wait until state changes - def handle_event({:call, _}, _action, :initializing, _data) do + def handle_event({:call, _}, _action, :connecting, _data) do {:keep_state_and_data, :postpone} end # every messages sent while inializing will wait until state changes - def handle_event(:cast, _action, :initializing, _data) do + def handle_event(:cast, _action, :connecting, _data) do {:keep_state_and_data, :postpone} end @@ -189,18 +190,28 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :enter, - {:connected, _socket}, + {:connected, socket}, :disconnected, - data = %{node_public_key: node_public_key, messages: messages} + data = %{ + node_public_key: node_public_key, + messages: messages, + heartbeats_timer: hb_ref, + transport: transport + } ) do Logger.warning("Connection closed", node: Base.encode16(node_public_key)) - set_node_disconnected(node_public_key) + # Stop heartbeats + :timer.cancel(hb_ref) + + transport.handle_close(socket) + # Stop availability timer new_data = data |> Map.put(:messages, %{}) + |> Map.put(:heartbeats_timer, nil) |> Map.update!(:availability_timer, fn {nil, time} -> {nil, time} @@ -225,18 +236,23 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :enter, - _, + :connecting, {:connected, _socket}, - data = %{node_public_key: node_public_key} + data = %{node_public_key: node_public_key, heartbeats_timer: hb_ref} ) do set_node_connected(node_public_key) + # Start heatbeats + :timer.cancel(hb_ref) + {:ok, hb_ref} = :timer.send_interval(@heartbeat_interval, :heartbeat) + # Start availability timer new_data = data |> Map.put(:reconnect_attempts, 0) |> Map.put(:heartbeats_sent, 0) |> Map.put(:heartbeats_received, 0) + |> Map.put(:heartbeats_timer, hb_ref) |> Map.update!(:availability_timer, fn {nil, time} -> {System.monotonic_time(:second), time} @@ -245,20 +261,17 @@ defmodule Archethic.P2P.Client.Connection do timer end) - Process.send_after(self(), :heartbeat, @heartbeat_interval) - {:keep_state, new_data} end - def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data + def handle_event(:enter, _old_state, :connecting, _data), do: :keep_state_and_data def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data - # called from the :disconnected or :initializing state def handle_event( :internal, {:connect, from}, - _state, - _data = %{ + :disconnected, + data = %{ ip: ip, port: port, transport: transport @@ -283,6 +296,15 @@ defmodule Archethic.P2P.Client.Connection do end end) + {:next_state, :connecting, data} + end + + def handle_event( + :internal, + {:connect, _from}, + _state, + _data + ) do :keep_state_and_data end @@ -448,11 +470,9 @@ defmodule Archethic.P2P.Client.Connection do ) do # disconnect if missed more than 2 heartbeats if heartbeats_sent - heartbeats_received >= 2 do - transport.handle_close(socket) {:next_state, :disconnected, data} else transport.handle_send(socket, "hb") - Process.send_after(self(), :heartbeat, @heartbeat_interval) {:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}} end end @@ -629,16 +649,13 @@ defmodule Archethic.P2P.Client.Connection do :ets.delete(@table_name, node_public_key) end - def code_change(1, state, data, _extra) do - Process.send_after(self(), :heartbeat, @heartbeat_interval) + def code_change(2, state = {:connected, _}, data, _extra) do + {:ok, hb_ref} = :timer.send_interval(@heartbeat_interval, :heartbeat) + {:ok, state, Map.merge(data, %{heartbeats_timer: hb_ref})} + end - {:ok, state, - data - |> Map.merge(%{ - reconnect_attempts: 0, - heartbeats_sent: 0, - heartbeats_received: 0 - })} + def code_change(2, state, data, _extra) do + {:ok, state, data} end def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data} diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index 28234b1a0..f98ba3291 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -46,7 +46,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do from: me ) - assert {:initializing, _} = :sys.get_state(pid) + assert {:connecting, _} = :sys.get_state(pid) Process.sleep(10) @@ -54,6 +54,48 @@ defmodule Archethic.P2P.Client.ConnectionTest do assert_received :connected end + test "do not start multiple connections" do + defmodule MockTransportTimeout do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect(_ip, _port) do + :persistent_term.put(:connect_count, :persistent_term.get(:connect_count, 0) + 1) + Process.sleep(200) + {:error, :timeout} + end + + def handle_close(_socket) do + :ok + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + node_public_key = Crypto.first_node_public_key() + + {:ok, _} = + Connection.start_link( + transport: MockTransportTimeout, + ip: {127, 0, 0, 2}, + port: 3000, + node_public_key: node_public_key + ) + + Connection.wake_up(node_public_key) + Connection.wake_up(node_public_key) + Connection.wake_up(node_public_key) + Connection.wake_up(node_public_key) + Connection.wake_up(node_public_key) + + Process.sleep(100) + + assert 1 = :persistent_term.get(:connect_count) + end + describe "send_message/3" do test "should send the message and enqueue the request" do {:ok, pid} = @@ -124,6 +166,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do end def handle_connect({127, 0, 0, 2}, _port) do + :persistent_term.put(:sleeping, true) Process.sleep(100_000) {:error, :timeout} end @@ -152,18 +195,18 @@ defmodule Archethic.P2P.Client.ConnectionTest do # 500ms to wait for the 1st reconnect attempt Process.sleep(550) - time = System.monotonic_time(:millisecond) + assert :persistent_term.get(:sleeping) - assert {:error, :closed} = + # we do not receive a closed anymore because we are in the :connecting state + # that postpones messages + assert {:error, :timeout} = Connection.send_message( Crypto.first_node_public_key(), %GetTransaction{address: ArchethicCase.random_address()}, 200 ) - # ensure there was no delay - time2 = System.monotonic_time(:millisecond) - assert time2 - time < 100 + assert :persistent_term.get(:sleeping) end test "should be in :connected state after reconnection" do