Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a timer:interval for heartbeats and avoid concurrent connections attempts #1592

Merged
merged 2 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ defmodule Archethic.P2P.Client.Connection do
@moduledoc """

3 states:
:initializing
:connecting
{:connected, socket}
:disconnected

we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt
we use the :connecting state to be able to postpone messages
"""

alias Archethic.Crypto
Expand All @@ -21,7 +21,7 @@ defmodule Archethic.P2P.Client.Connection do
require Logger

use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary
@vsn 2
@vsn 3
@table_name :connection_status
@max_reconnect_delay :timer.hours(6)

Expand Down Expand Up @@ -131,19 +131,20 @@ defmodule Archethic.P2P.Client.Connection do
availability_timer: {nil, 0},
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
heartbeats_received: 0,
heartbeats_timer: nil
}

{:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]}
{:ok, :disconnected, data, [{:next_event, :internal, {:connect, from}}]}
end

# every messages sent while inializing will wait until state changes
def handle_event({:call, _}, _action, :initializing, _data) do
def handle_event({:call, _}, _action, :connecting, _data) do
{:keep_state_and_data, :postpone}
end

# every messages sent while inializing will wait until state changes
def handle_event(:cast, _action, :initializing, _data) do
def handle_event(:cast, _action, :connecting, _data) do
{:keep_state_and_data, :postpone}
end

Expand Down Expand Up @@ -189,18 +190,28 @@ defmodule Archethic.P2P.Client.Connection do

def handle_event(
:enter,
{:connected, _socket},
{:connected, socket},
:disconnected,
data = %{node_public_key: node_public_key, messages: messages}
data = %{
node_public_key: node_public_key,
messages: messages,
heartbeats_timer: hb_ref,
transport: transport
}
) do
Logger.warning("Connection closed", node: Base.encode16(node_public_key))

set_node_disconnected(node_public_key)

# Stop heartbeats
:timer.cancel(hb_ref)

transport.handle_close(socket)

# Stop availability timer
new_data =
data
|> Map.put(:messages, %{})
|> Map.put(:heartbeats_timer, nil)
|> Map.update!(:availability_timer, fn
{nil, time} ->
{nil, time}
Expand All @@ -225,18 +236,23 @@ defmodule Archethic.P2P.Client.Connection do

def handle_event(
:enter,
_,
:connecting,
{:connected, _socket},
data = %{node_public_key: node_public_key}
data = %{node_public_key: node_public_key, heartbeats_timer: hb_ref}
) do
set_node_connected(node_public_key)

# Start heatbeats
:timer.cancel(hb_ref)
{:ok, hb_ref} = :timer.send_interval(@heartbeat_interval, :heartbeat)

# Start availability timer
new_data =
data
|> Map.put(:reconnect_attempts, 0)
|> Map.put(:heartbeats_sent, 0)
|> Map.put(:heartbeats_received, 0)
|> Map.put(:heartbeats_timer, hb_ref)
|> Map.update!(:availability_timer, fn
{nil, time} ->
{System.monotonic_time(:second), time}
Expand All @@ -245,20 +261,17 @@ defmodule Archethic.P2P.Client.Connection do
timer
end)

Process.send_after(self(), :heartbeat, @heartbeat_interval)

{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :connecting, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
:internal,
{:connect, from},
_state,
_data = %{
:disconnected,
data = %{
ip: ip,
port: port,
transport: transport
Expand All @@ -283,6 +296,15 @@ defmodule Archethic.P2P.Client.Connection do
end
end)

{:next_state, :connecting, data}
end

def handle_event(
:internal,
{:connect, _from},
_state,
_data
) do
:keep_state_and_data
end

Expand Down Expand Up @@ -448,11 +470,9 @@ defmodule Archethic.P2P.Client.Connection do
) do
# disconnect if missed more than 2 heartbeats
if heartbeats_sent - heartbeats_received >= 2 do
transport.handle_close(socket)
{:next_state, :disconnected, data}
else
transport.handle_send(socket, "hb")
Process.send_after(self(), :heartbeat, @heartbeat_interval)
{:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}}
end
end
Expand Down Expand Up @@ -629,16 +649,13 @@ defmodule Archethic.P2P.Client.Connection do
:ets.delete(@table_name, node_public_key)
end

def code_change(1, state, data, _extra) do
Process.send_after(self(), :heartbeat, @heartbeat_interval)
def code_change(2, state = {:connected, _}, data, _extra) do
{:ok, hb_ref} = :timer.send_interval(@heartbeat_interval, :heartbeat)
{:ok, state, Map.merge(data, %{heartbeats_timer: hb_ref})}
end

{:ok, state,
data
|> Map.merge(%{
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
})}
def code_change(2, state, data, _extra) do
{:ok, state, data}
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}
Expand Down
55 changes: 49 additions & 6 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,56 @@ defmodule Archethic.P2P.Client.ConnectionTest do
from: me
)

assert {:initializing, _} = :sys.get_state(pid)
assert {:connecting, _} = :sys.get_state(pid)

Process.sleep(10)

assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid)
assert_received :connected
end

test "do not start multiple connections" do
defmodule MockTransportTimeout do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect(_ip, _port) do
:persistent_term.put(:connect_count, :persistent_term.get(:connect_count, 0) + 1)
Process.sleep(200)
{:error, :timeout}
end

def handle_close(_socket) do
:ok
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

node_public_key = Crypto.first_node_public_key()

{:ok, _} =
Connection.start_link(
transport: MockTransportTimeout,
ip: {127, 0, 0, 2},
port: 3000,
node_public_key: node_public_key
)

Connection.wake_up(node_public_key)
Connection.wake_up(node_public_key)
Connection.wake_up(node_public_key)
Connection.wake_up(node_public_key)
Connection.wake_up(node_public_key)

Process.sleep(100)

assert 1 = :persistent_term.get(:connect_count)
end

describe "send_message/3" do
test "should send the message and enqueue the request" do
{:ok, pid} =
Expand Down Expand Up @@ -124,6 +166,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do
end

def handle_connect({127, 0, 0, 2}, _port) do
:persistent_term.put(:sleeping, true)
Process.sleep(100_000)
{:error, :timeout}
end
Expand Down Expand Up @@ -152,18 +195,18 @@ defmodule Archethic.P2P.Client.ConnectionTest do
# 500ms to wait for the 1st reconnect attempt
Process.sleep(550)

time = System.monotonic_time(:millisecond)
assert :persistent_term.get(:sleeping)

assert {:error, :closed} =
# we do not receive a closed anymore because we are in the :connecting state
# that postpones messages
assert {:error, :timeout} =
Connection.send_message(
Crypto.first_node_public_key(),
%GetTransaction{address: ArchethicCase.random_address()},
200
)

# ensure there was no delay
time2 = System.monotonic_time(:millisecond)
assert time2 - time < 100
assert :persistent_term.get(:sleeping)
end

test "should be in :connected state after reconnection" do
Expand Down
Loading