Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection backoff & heartbeat mechanism #1346

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ config :archethic, Archethic.P2P.GeoPatch.GeoIP, MockGeoIP

config :archethic, Archethic.P2P.BootstrappingSeeds, enabled: false

config :archethic, Archethic.P2P.Client.Connection,
backoff_strategy: :static,
heartbeat_interval: 200,
reconnect_delay: 50

config :archethic, Archethic.Mining.PendingTransactionValidation, validate_node_ip: true

config :archethic, Archethic.Metrics.Poller, enabled: false
Expand Down
128 changes: 118 additions & 10 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,20 @@ defmodule Archethic.P2P.Client.Connection do
require Logger

use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary
@vsn 1
@vsn 2
@table_name :connection_status

@heartbeat_interval Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:heartbeat_interval,
10_000
)
@reconnect_delay Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:reconnect_delay,
500
)

@doc """
Starts a new connection
"""
Expand Down Expand Up @@ -59,6 +70,18 @@ defmodule Archethic.P2P.Client.Connection do
end
end

@doc """
When called, if disconnect, it will try to connect to socket
Noop if it's already connected

It's used when some node has been offline for a long time
It has connected to us so we know we can connect to it as well
"""
@spec wake_up(Crypto.key()) :: :ok
def wake_up(public_key) do
GenStateMachine.cast(via_tuple(public_key), :wake_up)
end

@doc """
Get the availability timer and reset it with a new start time if it was already started
"""
Expand Down Expand Up @@ -102,7 +125,10 @@ defmodule Archethic.P2P.Client.Connection do
request_id: 0,
messages: %{},
send_tasks: %{},
availability_timer: {nil, 0}
availability_timer: {nil, 0},
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
}

{:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]}
Expand Down Expand Up @@ -190,7 +216,7 @@ defmodule Archethic.P2P.Client.Connection do
end)

# Reconnect with backoff
actions = [{{:timeout, :reconnect}, 500, nil} | actions]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil} | actions]
{:keep_state, new_data, actions}
end

Expand All @@ -204,20 +230,25 @@ defmodule Archethic.P2P.Client.Connection do

# Start availability timer
new_data =
Map.update!(data, :availability_timer, fn
data
|> Map.put(:reconnect_attempts, 0)
|> Map.put(:heartbeats_sent, 0)
|> Map.put(:heartbeats_received, 0)
|> Map.update!(:availability_timer, fn
{nil, time} ->
{System.monotonic_time(:second), time}

timer ->
timer
end)

Process.send_after(self(), :heartbeat, @heartbeat_interval)

{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
Expand Down Expand Up @@ -258,9 +289,11 @@ defmodule Archethic.P2P.Client.Connection do
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do
def handle_event({:timeout, :reconnect}, _event_data, _state, data) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state_and_data, actions}

new_data = Map.update!(data, :reconnect_attempts, &(&1 + 1))
{:keep_state, new_data, actions}
end

def handle_event(
Expand All @@ -273,6 +306,25 @@ defmodule Archethic.P2P.Client.Connection do
:keep_state_and_data
end

def handle_event(
:cast,
:wake_up,
:disconnected,
data
) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state, %{data | reconnect_attempts: 0}, actions}
end

def handle_event(
:cast,
:wake_up,
_,
_data
) do
:keep_state_and_data
end

def handle_event(
:cast,
{:send_message, ref, from, message, timeout},
Expand Down Expand Up @@ -381,6 +433,35 @@ defmodule Archethic.P2P.Client.Connection do
end
end

def handle_event(
:info,
:heartbeat,
{:connected, socket},
data = %{
transport: transport,
heartbeats_sent: heartbeats_sent,
heartbeats_received: heartbeats_received
}
) do
# disconnect if missed more than 2 heartbeats
if heartbeats_sent - heartbeats_received >= 2 do
{:next_state, :disconnected, data}
bchamagne marked this conversation as resolved.
Show resolved Hide resolved
else
transport.handle_send(socket, "hb")
Process.send_after(self(), :heartbeat, @heartbeat_interval)
{:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}}
end
end

def handle_event(
:info,
:heartbeat,
_state,
_data
) do
:keep_state_and_data
end

def handle_event(:info, {ref, :ok}, {:connected, _socket}, data = %{send_tasks: send_tasks}) do
case Map.pop(send_tasks, ref) do
{nil, _} ->
Expand Down Expand Up @@ -440,13 +521,13 @@ defmodule Archethic.P2P.Client.Connection do

# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:error, _reason, nil}}, _, data) do
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

def handle_event(:info, {_ref, {:error, reason, from}}, _, data) do
send(from, {:error, reason})
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

Expand All @@ -456,7 +537,8 @@ defmodule Archethic.P2P.Client.Connection do
{:connected, _socket},
data = %{
transport: transport,
node_public_key: node_public_key
node_public_key: node_public_key,
heartbeats_received: heartbeats_received
}
) do
case transport.handle_message(event) do
Expand All @@ -467,6 +549,9 @@ defmodule Archethic.P2P.Client.Connection do

{:next_state, :disconnected, data}

{:ok, "hb"} ->
{:keep_state, %{data | heartbeats_received: heartbeats_received + 1}}

{:ok, msg} ->
set_node_connected(node_public_key)

Expand Down Expand Up @@ -540,5 +625,28 @@ defmodule Archethic.P2P.Client.Connection do
:ets.delete(@table_name, node_public_key)
end

def code_change(1, state, data, _extra) do
{:ok, state,
data
|> Map.merge(%{
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
})}
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}

defp backoff(attempts) do
config = Application.get_env(:archethic, __MODULE__, [])

case Keyword.get(config, :backoff_strategy, :exponential) do
:static ->
@reconnect_delay

:exponential ->
# cap at 24hours
min(:timer.hours(24), 2 ** attempts * @reconnect_delay)
end
end
end
18 changes: 18 additions & 0 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.P2P.ListenerProtocol do

alias Archethic.Crypto
alias Archethic.P2P
alias Archethic.P2P.Client.Connection
alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop
alias Archethic.TaskSupervisor
Expand All @@ -35,6 +36,19 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info(
{_transport, socket, "hb"},
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(TaskSupervisor, fn ->
transport.send(socket, "hb")
end)

{:noreply, state}
end

def handle_info(
{_transport, socket, err},
state = %{transport: transport, ip: ip}
Expand All @@ -45,6 +59,7 @@ defmodule Archethic.P2P.ListenerProtocol do
end

transport.close(socket)

{:noreply, state}
end

Expand Down Expand Up @@ -84,6 +99,9 @@ defmodule Archethic.P2P.ListenerProtocol do
)

if valid_signature? do
# we may attempt to wakeup a connection that offline
Connection.wake_up(sender_pkey)

message
|> process_msg(sender_pkey)
|> encode_response(message_id, sender_pkey)
Expand Down
66 changes: 64 additions & 2 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ defmodule Archethic.P2P.Client.ConnectionTest do

alias Archethic.Utils

@heartbeat_interval Keyword.get(
Application.compile_env(:archethic, Connection, []),
:heartbeat_interval,
10_000
)

test "start_link/1 should open a socket and a connection worker and initialize the backlog and lookup tables" do
{:ok, pid} =
Connection.start_link(
Expand Down Expand Up @@ -166,7 +172,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok
def handle_send(_socket, _), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end
Expand Down Expand Up @@ -555,6 +561,57 @@ defmodule Archethic.P2P.Client.ConnectionTest do
end
end

describe "Stale detection" do
test "should change state to disconnected once a few heartbeats are missed" do
defmodule MockTransportStale do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
conn_count = :persistent_term.get(:conn_count, 0)
:persistent_term.put(:conn_count, conn_count + 1)

if conn_count == 0 do
{:ok, make_ref()}
else
{:error, :timeout}
end
end

def handle_send(_socket, "hb") do
hb_count = :persistent_term.get(:hb_count, 0)
:persistent_term.put(:hb_count, hb_count + 1)

# become stale after 5 hbs
if hb_count <= 5 do
send(self(), {:tcp, make_ref(), "hb"})
end

:ok
end

def handle_send(_socket, _), do: :ok

def handle_message({_, _, data}), do: {:ok, data}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportStale,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

Process.sleep(@heartbeat_interval * 5)
assert {{:connected, _}, _} = :sys.get_state(pid)

Process.sleep(@heartbeat_interval * 5)
assert {:disconnected, _} = :sys.get_state(pid)
end
end

defmodule MockTransport do
alias Archethic.P2P.Client.Transport

Expand All @@ -564,6 +621,11 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, "hb") do
send(self(), {:tcp, make_ref(), "hb"})
:ok
end

def handle_send(_socket, _data), do: :ok

def handle_message({_, _, data}), do: {:ok, data}
Expand All @@ -578,7 +640,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok
def handle_send(_socket, _), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end
Expand Down
Loading