Skip to content

Commit

Permalink
Improve P2P connection and message sending
Browse files Browse the repository at this point in the history
By using a custom FSM implementation instead of Connection based
we have more control of how the connection is managed and the message
transmitted

Switch to full asynchronous message and managing timeout differently
using gen_statem timeout and receive timeout.
  • Loading branch information
Samuel committed May 20, 2022
1 parent c2f9ded commit 731484a
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 149 deletions.
16 changes: 7 additions & 9 deletions lib/archethic/beacon_chain/subset/p2p_sampling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,30 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
"""
@spec get_p2p_views(list(Node.t())) :: list(p2p_view())
def get_p2p_views(nodes) when is_list(nodes) do
Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view/1,
timeout: 2_000,
timeout = 1_000

Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view(&1, timeout),
on_timeout: :kill_task
)
|> Enum.map(fn
{:ok, res} ->
res

{:exit, :timeout} ->
{false, 1_000}

{:error, _} ->
{false, 0}
{false, timeout}
end)
end

defp do_sample_p2p_view(node = %Node{}) do
defp do_sample_p2p_view(node = %Node{}, timeout) do
start_time = System.monotonic_time(:millisecond)

case P2P.send_message(node, %Ping{}, 1_000) do
case P2P.send_message(node, %Ping{}, timeout) do
{:ok, %Ok{}} ->
end_time = System.monotonic_time(:millisecond)
{true, end_time - start_time}

{:error, _} ->
{false, 1_000}
{false, timeout}
end
end
end
20 changes: 12 additions & 8 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ defmodule Archethic.P2P do
|> send_message!(message, timeout)
end

def send_message!(node = %Node{ip: ip, port: port}, message, timeout) do
def send_message!(
node = %Node{ip: ip, port: port},
message,
timeout
) do
case Client.send_message(node, message, timeout) do
{:ok, ref} ->
ref
Expand All @@ -225,15 +229,15 @@ defmodule Archethic.P2P do
| {:error, :not_found}
| {:error, :timeout}
| {:error, :closed}
def send_message(node, message, timeout \\ 5_000)
def send_message(node, message, timeout \\ 3_000)

def send_message(public_key, message, timeout) when is_binary(public_key) do
with {:ok, node} <- get_node_info(public_key),
{:ok, data} <- send_message(node, message, timeout) do
{:ok, data}
else
{:error, _} = e ->
e
case get_node_info(public_key) do
{:ok, node} ->
send_message(node, message, timeout)

{:error, :not_found} ->
{:error, :not_found}
end
end

Expand Down
Loading

0 comments on commit 731484a

Please sign in to comment.