Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better p2p concurrency #391

Merged
3 commits merged into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Node
alias Archethic.P2P.Message.RegisterBeaconUpdates

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
alias Archethic.TransactionChain.TransactionData

alias Archethic.Utils
Expand Down Expand Up @@ -106,37 +109,66 @@ defmodule Archethic.BeaconChain do
@spec load_transaction(Transaction.t()) :: :ok | :error
def load_transaction(
tx = %Transaction{
address: tx_address,
type: :beacon,
data: %TransactionData{content: content}
data: %TransactionData{content: content},
validation_stamp: %ValidationStamp{
timestamp: timestamp
}
}
) do
with {%Slot{subset: subset, slot_time: slot_time} = slot, _} <- Slot.deserialize(content),
:ok <- validate_slot(tx, slot),
genesis_address <-
Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time)),
:ok <- TransactionChain.write_transaction_at(tx, genesis_address) do
Logger.debug("New beacon transaction loaded - #{inspect(slot)}",
beacon_subset: Base.encode16(subset)
)
:ok <- validate_beacon_address(subset, slot_time, tx_address),
slot_time <- SlotTimer.previous_slot(timestamp) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
case validate_slot(slot) do
:ok ->
genesis_address =
Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time))

:ok = TransactionChain.write_transaction_at(tx, genesis_address)

Logger.debug("New beacon transaction loaded - #{inspect(slot)}",
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot)

SummaryCache.add_slot(subset, slot)
{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
end
end)

:ok
else
{:error, _} = e ->
Logger.error("Invalid beacon slot #{inspect(e)}")
{:error, :invalid_address} ->
Logger.error("Invalid beacon slot - Invalid tx address")
:error

%DateTime{} ->
Logger.error("Invalid beacon slot - Invalid slot time")
:error

_ ->
Logger.error("Invalid beacon slot - Unexpected serialized data")
:error
end
end

def load_transaction(_), do: :ok

defp validate_slot(
%Transaction{address: address},
slot = %Slot{subset: subset, slot_time: slot_time}
) do
cond do
address != Crypto.derive_beacon_chain_address(subset, slot_time) ->
defp validate_beacon_address(subset, slot_time, address) do
case Crypto.derive_beacon_chain_address(subset, slot_time) do
^address ->
:ok

_ ->
{:error, :invalid_address}
end
end

defp validate_slot(slot = %Slot{}) do
cond do
!SlotValidation.valid_transaction_attestations?(slot) ->
{:error, :invalid_transaction_attestations}

Expand Down
5 changes: 1 addition & 4 deletions lib/archethic/p2p/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Archethic.P2P.Listener do
use GenServer

alias Archethic.P2P.ListenerProtocol
alias Archethic.P2P.ListenerProtocol.Supervisor, as: ListenerProtocolSupervisor

require Logger

Expand All @@ -25,8 +24,6 @@ defmodule Archethic.P2P.Listener do
transport
end

{:ok, listener_protocol_sup} = ListenerProtocolSupervisor.start_link()

{:ok, listener_pid} =
:ranch.start_listener(
:archethic_p2p,
Expand All @@ -38,6 +35,6 @@ defmodule Archethic.P2P.Listener do

Logger.info("P2P #{transport} Endpoint running on port #{port}")

{:ok, %{listener_pid: listener_pid, listener_protocol_sup_pid: listener_protocol_sup}}
{:ok, %{listener_pid: listener_pid}}
end
end
20 changes: 2 additions & 18 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
defmodule Archethic.P2P.ListenerProtocol do
@moduledoc false

alias __MODULE__.BroadwayPipeline
alias __MODULE__.MessageProducer
alias __MODULE__.MessageProducerRegistry

require Logger

Expand All @@ -20,34 +18,20 @@ defmodule Archethic.P2P.ListenerProtocol do

{:ok, {ip, port}} = :inet.peername(socket)

{:ok, _pid} =
BroadwayPipeline.start_link(
socket: socket,
transport: transport,
conn_pid: self(),
ip: ip,
port: port
)

Process.sleep(100)

[{producer_pid, _}] = Registry.lookup(MessageProducerRegistry, socket)

:gen_server.enter_loop(__MODULE__, [], %{
socket: socket,
transport: transport,
producer_pid: producer_pid,
ip: ip,
port: port
})
end

def handle_info(
{_transport, socket, msg},
state = %{producer_pid: producer_pid}
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)
MessageProducer.new_message(producer_pid, msg)
MessageProducer.new_message({socket, transport, msg})
{:noreply, state}
end

Expand Down
43 changes: 9 additions & 34 deletions lib/archethic/p2p/listener_protocol/broadway_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do

alias Archethic.Crypto

alias Archethic.P2P.ListenerProtocol.BroadwayPipelineRegistry
alias Archethic.P2P.ListenerProtocol.MessageProducer
alias Archethic.P2P.MemTable
alias Archethic.P2P.Message
Expand All @@ -15,54 +14,40 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do

use Broadway

def start_link(arg) do
socket = Keyword.get(arg, :socket)
transport = Keyword.get(arg, :transport)
ip = Keyword.get(arg, :ip)
port = Keyword.get(arg, :port)
conn_pid = Keyword.get(arg, :conn_pid)

def start_link(arg \\ []) do
Broadway.start_link(__MODULE__,
name: {:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}},
context: %{
socket: socket,
transport: transport
},
name: __MODULE__,
producer: [
module: {MessageProducer, arg},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [concurrency: 5, max_demand: 1]
default: [concurrency: System.schedulers_online() * 10, max_demand: 1]
]
)
end

def process_name(
{:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}},
base_name
) do
pid_string = conn_pid |> :erlang.pid_to_list() |> :erlang.list_to_binary()
:"#{:inet.ntoa(ip)}:#{port}.#{pid_string}.Broadway.#{base_name}"
end

def transform(event, _) do
%BroadwayMessage{
data: event,
acknowledger: {Broadway.NoopAcknowledger, _ack_ref = nil, _ack_data = nil}
}
end

def handle_message(_, message, %{socket: socket, transport: transport}) do
BroadwayMessage.update_data(message, fn data ->
def handle_message(_, message, _context) do
# start_time = System.monotonic_time(:millisecond)

BroadwayMessage.update_data(message, fn {socket, transport, data} ->
message =
data
|> decode()
|> process()
|> encode()

transport.send(socket, message)
# end_time = System.monotnonic_time(:millisecond)
# Logger.debug("Request processed in #{end_time - start_time} ms")
end)
end

Expand All @@ -73,11 +58,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do
sender_public_key: sender_public_key
} = MessageEnvelop.decode(data)

# Logger.debug("Receive message #{Message.name(message)}",
# node: Base.encode16(sender_public_key),
# message_id: message_id
# )

MemTable.increase_node_availability(sender_public_key)
{System.monotonic_time(:millisecond), message_id, message, sender_public_key}
end
Expand All @@ -86,11 +66,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do
response = Message.process(message)
# end_time = System.monotonic_time(:millisecond)

# Logger.debug("Message #{Message.name(message)} processed in #{end_time - start_time} ms",
# node: Base.encode16(sender_public_key),
# message_id: message_id
# )

{message_id, response, sender_public_key}
end

Expand Down
20 changes: 11 additions & 9 deletions lib/archethic/p2p/listener_protocol/message_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ defmodule Archethic.P2P.ListenerProtocol.MessageProducer do
@moduledoc false
use GenStage

alias Archethic.P2P.ListenerProtocol.MessageProducerRegistry
alias Archethic.P2P.ListenerProtocol.BroadwayPipeline.Broadway.Producer_0, as: Producer

def start_link(arg) do
GenStage.start_link(__MODULE__, arg)
def start_link(arg, opts \\ [name: __MODULE__]) do
GenStage.start_link(__MODULE__, arg, opts)
end

def new_message(pid, message) do
def new_message(pid \\ Producer, message) do
GenStage.cast(pid, {:new_message, message})
end

def init(arg) do
socket = Keyword.get(arg, :socket)
Registry.register(MessageProducerRegistry, socket, [])
def init(_arg) do
{:producer, %{demand: 0, queue: :queue.new()}}
end

def handle_cast({:new_message, message}, state = %{queue: queue, demand: pending_demand}) do
queue = :queue.in(message, queue)
def handle_cast(
{:new_message, {socket, transport, message}},
state = %{queue: queue, demand: pending_demand}
) do
queue = :queue.in({socket, transport, message}, queue)

dispatch_events(queue, pending_demand, [], state)
end

Expand Down
21 changes: 0 additions & 21 deletions lib/archethic/p2p/listener_protocol/supervisor.ex

This file was deleted.

2 changes: 2 additions & 0 deletions lib/archethic/p2p/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Archethic.P2P.Supervisor do
alias Archethic.P2P.Client.ConnectionRegistry
alias Archethic.P2P.Client.ConnectionSupervisor
alias Archethic.P2P.Listener
alias Archethic.P2P.ListenerProtocol.BroadwayPipeline
alias Archethic.P2P.MemTable
alias Archethic.P2P.MemTableLoader
alias Archethic.P2P.GeoPatch.GeoIP.MaxMindDB
Expand All @@ -30,6 +31,7 @@ defmodule Archethic.P2P.Supervisor do
MaxMindDB,
MemTable,
MemTableLoader,
BroadwayPipeline,
{Listener, Keyword.put(listener_conf, :port, port)},
{BootstrappingSeeds, bootstraping_seeds_conf}
]
Expand Down
6 changes: 6 additions & 0 deletions test/archethic/beacon_chain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ defmodule Archethic.BeaconChainTest do
authorization_date: DateTime.utc_now() |> DateTime.add(-10)
})

me = self()

MockDB
|> expect(:write_transaction_at, fn _, _ ->
send(me, :tx_written)
:ok
end)

Expand All @@ -95,6 +98,9 @@ defmodule Archethic.BeaconChainTest do
}

assert :ok = BeaconChain.load_transaction(tx)
assert_receive :tx_written

Process.sleep(500)

assert [%Slot{subset: <<0>>}] = SummaryCache.pop_slots(<<0>>)
end
Expand Down