From 765a07beedfa63c8b5a4a71d55b9ab6d3bbef6fb Mon Sep 17 00:00:00 2001 From: Samuel Date: Mon, 20 Jun 2022 23:06:04 +0200 Subject: [PATCH 1/3] Add Broadway pipeline globally for system wide backpressure --- lib/archethic/p2p/listener.ex | 5 +-- lib/archethic/p2p/listener_protocol.ex | 20 +-------- .../listener_protocol/broadway_pipeline.ex | 43 ++++--------------- .../p2p/listener_protocol/message_producer.ex | 20 +++++---- .../p2p/listener_protocol/supervisor.ex | 21 --------- lib/archethic/p2p/supervisor.ex | 2 + 6 files changed, 25 insertions(+), 86 deletions(-) delete mode 100644 lib/archethic/p2p/listener_protocol/supervisor.ex diff --git a/lib/archethic/p2p/listener.ex b/lib/archethic/p2p/listener.ex index ece981a54..49854357a 100644 --- a/lib/archethic/p2p/listener.ex +++ b/lib/archethic/p2p/listener.ex @@ -4,7 +4,6 @@ defmodule Archethic.P2P.Listener do use GenServer alias Archethic.P2P.ListenerProtocol - alias Archethic.P2P.ListenerProtocol.Supervisor, as: ListenerProtocolSupervisor require Logger @@ -25,8 +24,6 @@ defmodule Archethic.P2P.Listener do transport end - {:ok, listener_protocol_sup} = ListenerProtocolSupervisor.start_link() - {:ok, listener_pid} = :ranch.start_listener( :archethic_p2p, @@ -38,6 +35,6 @@ defmodule Archethic.P2P.Listener do Logger.info("P2P #{transport} Endpoint running on port #{port}") - {:ok, %{listener_pid: listener_pid, listener_protocol_sup_pid: listener_protocol_sup}} + {:ok, %{listener_pid: listener_pid}} end end diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 4b9282061..de55278f2 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -1,9 +1,7 @@ defmodule Archethic.P2P.ListenerProtocol do @moduledoc false - alias __MODULE__.BroadwayPipeline alias __MODULE__.MessageProducer - alias __MODULE__.MessageProducerRegistry require Logger @@ -20,23 +18,9 @@ defmodule Archethic.P2P.ListenerProtocol do {:ok, {ip, port}} = :inet.peername(socket) - {:ok, _pid} = - BroadwayPipeline.start_link( - socket: socket, - transport: transport, - conn_pid: self(), - ip: ip, - port: port - ) - - Process.sleep(100) - - [{producer_pid, _}] = Registry.lookup(MessageProducerRegistry, socket) - :gen_server.enter_loop(__MODULE__, [], %{ socket: socket, transport: transport, - producer_pid: producer_pid, ip: ip, port: port }) @@ -44,10 +28,10 @@ defmodule Archethic.P2P.ListenerProtocol do def handle_info( {_transport, socket, msg}, - state = %{producer_pid: producer_pid} + state = %{transport: transport} ) do :inet.setopts(socket, active: :once) - MessageProducer.new_message(producer_pid, msg) + MessageProducer.new_message({socket, transport, msg}) {:noreply, state} end diff --git a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex b/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex index d1f33ce6d..883d2f1f2 100644 --- a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex +++ b/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex @@ -3,7 +3,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do alias Archethic.Crypto - alias Archethic.P2P.ListenerProtocol.BroadwayPipelineRegistry alias Archethic.P2P.ListenerProtocol.MessageProducer alias Archethic.P2P.MemTable alias Archethic.P2P.Message @@ -15,38 +14,20 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do use Broadway - def start_link(arg) do - socket = Keyword.get(arg, :socket) - transport = Keyword.get(arg, :transport) - ip = Keyword.get(arg, :ip) - port = Keyword.get(arg, :port) - conn_pid = Keyword.get(arg, :conn_pid) - + def start_link(arg \\ []) do Broadway.start_link(__MODULE__, - name: {:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}}, - context: %{ - socket: socket, - transport: transport - }, + name: __MODULE__, producer: [ module: {MessageProducer, arg}, transformer: {__MODULE__, :transform, []}, concurrency: 1 ], processors: [ - default: [concurrency: 5, max_demand: 1] + default: [concurrency: System.schedulers_online() * 10, max_demand: 1] ] ) end - def process_name( - {:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}}, - base_name - ) do - pid_string = conn_pid |> :erlang.pid_to_list() |> :erlang.list_to_binary() - :"#{:inet.ntoa(ip)}:#{port}.#{pid_string}.Broadway.#{base_name}" - end - def transform(event, _) do %BroadwayMessage{ data: event, @@ -54,8 +35,10 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do } end - def handle_message(_, message, %{socket: socket, transport: transport}) do - BroadwayMessage.update_data(message, fn data -> + def handle_message(_, message, _context) do + # start_time = System.monotonic_time(:millisecond) + + BroadwayMessage.update_data(message, fn {socket, transport, data} -> message = data |> decode() @@ -63,6 +46,8 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do |> encode() transport.send(socket, message) + # end_time = System.monotnonic_time(:millisecond) + # Logger.debug("Request processed in #{end_time - start_time} ms") end) end @@ -73,11 +58,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do sender_public_key: sender_public_key } = MessageEnvelop.decode(data) - # Logger.debug("Receive message #{Message.name(message)}", - # node: Base.encode16(sender_public_key), - # message_id: message_id - # ) - MemTable.increase_node_availability(sender_public_key) {System.monotonic_time(:millisecond), message_id, message, sender_public_key} end @@ -86,11 +66,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do response = Message.process(message) # end_time = System.monotonic_time(:millisecond) - # Logger.debug("Message #{Message.name(message)} processed in #{end_time - start_time} ms", - # node: Base.encode16(sender_public_key), - # message_id: message_id - # ) - {message_id, response, sender_public_key} end diff --git a/lib/archethic/p2p/listener_protocol/message_producer.ex b/lib/archethic/p2p/listener_protocol/message_producer.ex index 77e9c5496..b55eb1278 100644 --- a/lib/archethic/p2p/listener_protocol/message_producer.ex +++ b/lib/archethic/p2p/listener_protocol/message_producer.ex @@ -2,24 +2,26 @@ defmodule Archethic.P2P.ListenerProtocol.MessageProducer do @moduledoc false use GenStage - alias Archethic.P2P.ListenerProtocol.MessageProducerRegistry + alias Archethic.P2P.ListenerProtocol.BroadwayPipeline.Broadway.Producer_0, as: Producer - def start_link(arg) do - GenStage.start_link(__MODULE__, arg) + def start_link(arg, opts \\ [name: __MODULE__]) do + GenStage.start_link(__MODULE__, arg, opts) end - def new_message(pid, message) do + def new_message(pid \\ Producer, message) do GenStage.cast(pid, {:new_message, message}) end - def init(arg) do - socket = Keyword.get(arg, :socket) - Registry.register(MessageProducerRegistry, socket, []) + def init(_arg) do {:producer, %{demand: 0, queue: :queue.new()}} end - def handle_cast({:new_message, message}, state = %{queue: queue, demand: pending_demand}) do - queue = :queue.in(message, queue) + def handle_cast( + {:new_message, {socket, transport, message}}, + state = %{queue: queue, demand: pending_demand} + ) do + queue = :queue.in({socket, transport, message}, queue) + dispatch_events(queue, pending_demand, [], state) end diff --git a/lib/archethic/p2p/listener_protocol/supervisor.ex b/lib/archethic/p2p/listener_protocol/supervisor.ex deleted file mode 100644 index ef309c674..000000000 --- a/lib/archethic/p2p/listener_protocol/supervisor.ex +++ /dev/null @@ -1,21 +0,0 @@ -defmodule Archethic.P2P.ListenerProtocol.Supervisor do - @moduledoc false - - alias Archethic.P2P.ListenerProtocol.BroadwayPipelineRegistry - alias Archethic.P2P.ListenerProtocol.MessageProducerRegistry - - use Supervisor - - def start_link(arg \\ []) do - Supervisor.start_link(__MODULE__, arg, name: __MODULE__) - end - - def init(_) do - children = [ - {Registry, name: BroadwayPipelineRegistry, keys: :unique}, - {Registry, name: MessageProducerRegistry, keys: :unique} - ] - - Supervisor.init(children, strategy: :one_for_all) - end -end diff --git a/lib/archethic/p2p/supervisor.ex b/lib/archethic/p2p/supervisor.ex index fcbe8c771..82ecb9695 100644 --- a/lib/archethic/p2p/supervisor.ex +++ b/lib/archethic/p2p/supervisor.ex @@ -5,6 +5,7 @@ defmodule Archethic.P2P.Supervisor do alias Archethic.P2P.Client.ConnectionRegistry alias Archethic.P2P.Client.ConnectionSupervisor alias Archethic.P2P.Listener + alias Archethic.P2P.ListenerProtocol.BroadwayPipeline alias Archethic.P2P.MemTable alias Archethic.P2P.MemTableLoader alias Archethic.P2P.GeoPatch.GeoIP.MaxMindDB @@ -30,6 +31,7 @@ defmodule Archethic.P2P.Supervisor do MaxMindDB, MemTable, MemTableLoader, + BroadwayPipeline, {Listener, Keyword.put(listener_conf, :port, port)}, {BootstrappingSeeds, bootstraping_seeds_conf} ] From a0f466bd7b4df1e5b3de7177833aa0073fdd18c5 Mon Sep 17 00:00:00 2001 From: Samuel Date: Mon, 20 Jun 2022 23:06:46 +0200 Subject: [PATCH 2/3] Async loading of beacon transaction --- lib/archethic/beacon_chain.ex | 66 ++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 4a82f4c95..e9293edf6 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -23,8 +23,11 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P.Node alias Archethic.P2P.Message.RegisterBeaconUpdates + alias Archethic.TaskSupervisor + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.TransactionData alias Archethic.Utils @@ -106,37 +109,66 @@ defmodule Archethic.BeaconChain do @spec load_transaction(Transaction.t()) :: :ok | :error def load_transaction( tx = %Transaction{ + address: tx_address, type: :beacon, - data: %TransactionData{content: content} + data: %TransactionData{content: content}, + validation_stamp: %ValidationStamp{ + timestamp: timestamp + } } ) do with {%Slot{subset: subset, slot_time: slot_time} = slot, _} <- Slot.deserialize(content), - :ok <- validate_slot(tx, slot), - genesis_address <- - Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time)), - :ok <- TransactionChain.write_transaction_at(tx, genesis_address) do - Logger.debug("New beacon transaction loaded - #{inspect(slot)}", - beacon_subset: Base.encode16(subset) - ) + :ok <- validate_beacon_address(subset, slot_time, tx_address), + slot_time <- SlotTimer.previous_slot(timestamp) do + Task.Supervisor.start_child(TaskSupervisor, fn -> + case validate_slot(slot) do + :ok -> + genesis_address = + Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time)) + + :ok = TransactionChain.write_transaction_at(tx, genesis_address) + + Logger.debug("New beacon transaction loaded - #{inspect(slot)}", + beacon_subset: Base.encode16(subset) + ) + + SummaryCache.add_slot(subset, slot) - SummaryCache.add_slot(subset, slot) + {:error, reason} -> + Logger.error("Invalid beacon slot - #{inspect(reason)}") + end + end) + + :ok else - {:error, _} = e -> - Logger.error("Invalid beacon slot #{inspect(e)}") + {:error, :invalid_address} -> + Logger.error("Invalid beacon slot - Invalid tx address") + :error + + %DateTime{} -> + Logger.error("Invalid beacon slot - Invalid slot time") + :error + + _ -> + Logger.error("Invalid beacon slot - Unexpected serialized data") :error end end def load_transaction(_), do: :ok - defp validate_slot( - %Transaction{address: address}, - slot = %Slot{subset: subset, slot_time: slot_time} - ) do - cond do - address != Crypto.derive_beacon_chain_address(subset, slot_time) -> + defp validate_beacon_address(subset, slot_time, address) do + case Crypto.derive_beacon_chain_address(subset, slot_time) do + ^address -> + :ok + + _ -> {:error, :invalid_address} + end + end + defp validate_slot(slot = %Slot{}) do + cond do !SlotValidation.valid_transaction_attestations?(slot) -> {:error, :invalid_transaction_attestations} From b8ee9354a933f099dcb1b0e30b5ec9e2ee1771a7 Mon Sep 17 00:00:00 2001 From: Samuel Date: Thu, 23 Jun 2022 16:44:59 +0200 Subject: [PATCH 3/3] Fix test --- test/archethic/beacon_chain_test.exs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index f5084ecbe..0e8c4f98d 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -75,8 +75,11 @@ defmodule Archethic.BeaconChainTest do authorization_date: DateTime.utc_now() |> DateTime.add(-10) }) + me = self() + MockDB |> expect(:write_transaction_at, fn _, _ -> + send(me, :tx_written) :ok end) @@ -95,6 +98,9 @@ defmodule Archethic.BeaconChainTest do } assert :ok = BeaconChain.load_transaction(tx) + assert_receive :tx_written + + Process.sleep(500) assert [%Slot{subset: <<0>>}] = SummaryCache.pop_slots(<<0>>) end