Skip to content

Commit

Permalink
fix: revert wal sender handling (#1305)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Feb 20, 2025
1 parent 39fd634 commit 2f9e783
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 190 deletions.
115 changes: 57 additions & 58 deletions README.md

Large diffs are not rendered by default.

29 changes: 0 additions & 29 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -312,35 +312,6 @@ defmodule Realtime.Database do
:ok
end

@doc """
Transforms database settings into keyword list to be used by Postgrex.
## Examples
iex> Database.opts(%Database{hostname: "localhost", port: 5432, database: "realtime", username: "postgres", password: "postgres", application_name: "test", backoff_type: :stop, pool_size: 10, queue_target: 10_000, socket_options: [:inet], ssl: true}) |> Enum.sort()
[
application_name: "test",
backoff_type: :stop,
database: "realtime",
hostname: "localhost",
max_restarts: nil,
password: "postgres",
pool_size: 10,
port: 5432,
queue_target: 10000,
socket_options: [:inet],
ssl: true,
username: "postgres"
]
"""

@spec opts(__MODULE__.t()) :: keyword()
def opts(%__MODULE__{} = settings) do
settings
|> Map.from_struct()
|> Map.to_list()
|> Keyword.new()
end

defp tenant_pool_requirements(settings) do
application_names = [
"realtime_subscription_manager",
Expand Down
4 changes: 0 additions & 4 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ defmodule Realtime.Tenants.Connect do
{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid},
{:continue, :setup_connected_user_events}}
else
{:error, :max_wal_senders_reached} ->
log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
{:stop, :shutdown, state}

{:error, error} ->
log_error("StartListenAndReplicationFailed", error)
{:stop, :shutdown, state}
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
{:error, {:bad_return_from_init, {:stop, error, _}}} -> {:error, error}
{:error, %Postgrex.Error{postgres: %{pg_code: "53300"}}} -> {:error, :max_wal_senders_reached}
error -> error
end
end
Expand Down
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.27",
version: "2.34.28",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -76,7 +76,6 @@ defmodule Realtime.MixProject do
{:mock, "~> 0.3", only: :test},
{:floki, ">= 0.30.0", only: :test},
{:mint_web_socket, "~> 1.0", only: :test},
{:postgres_replication, git: "https://github.com/filipecabaco/postgres_replication.git", only: :test},
{:benchee, "~> 1.1.0", only: [:dev, :test]},
{:excoveralls, "~> 0.18", only: [:dev, :test], runtime: false},
{:sobelow, "~> 0.13", only: [:dev, :test], runtime: false},
Expand Down
1 change: 0 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.2", "fdadb973799ae691bf9ecad99125b16625b1c6039999da5fe544d99218e662e4", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "245d8a11ee2306094840c000e8816f0cbed69a23fc0ac2bcf8d7835ae019bb2f"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"postgres_replication": {:git, "https://github.com/filipecabaco/postgres_replication.git", "951ff8b2d114504d0819b4cffaa075bee5bd66a5", []},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"prom_ex": {:hex, :prom_ex, "1.9.0", "63e6dda6c05cdeec1f26c48443dcc38ffd2118b3665ae8d2bd0e5b79f2aea03e", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "01f3d4f69ec93068219e686cc65e58a29c42bea5429a8ff4e2121f19db178ee6"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
Expand Down
38 changes: 1 addition & 37 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Realtime.Tenants.ConnectTest do
# async: false due to the fact that multiple operations against the database will use the same connection
use Realtime.DataCase, async: false
import ExUnit.CaptureLog

import Mock

alias Realtime.Database
alias Realtime.Tenants.Connect
alias Realtime.Tenants.Listen
alias Realtime.Tenants.ReplicationConnection
Expand Down Expand Up @@ -272,41 +271,6 @@ defmodule Realtime.Tenants.ConnectTest do
assert Listen.whereis(tenant.external_id) == nil
end

test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do
opts = Database.from_tenant(tenant, "realtime_test", :stop) |> Database.opts()

# This creates a loop of errors that occupies all WAL senders and lets us test the error handling
pids =
for i <- 0..4 do
replication_slot_opts =
%PostgresReplication{
connection_opts: opts,
table: :all,
output_plugin: "pgoutput",
output_plugin_options: [],
handler_module: TestHandler,
publication_name: "test_#{i}_publication",
replication_slot_name: "test_#{i}_slot"
}

{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
pid
end

on_exit(fn ->
Enum.each(pids, &Process.exit(&1, :kill))
Process.sleep(2000)
end)

log =
capture_log(fn ->
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(3000)
end)

assert log =~ "ReplicationMaxWalSendersReached"
end

test "syn with no connection", %{tenant: tenant} do
with_mock :syn, [], lookup: fn _, _ -> {nil, %{conn: nil}} end do
assert {:error, :tenant_database_unavailable} =
Expand Down
62 changes: 4 additions & 58 deletions test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do

on_exit(fn -> Application.put_env(:realtime, :slot_name_suffix, slot) end)

%{tenant: tenant}
:ok
end

test "fails if tenant connection is invalid" do
Expand Down Expand Up @@ -103,7 +103,9 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages)
end

test "pid is associated to the same pid for a given tenant and guarantees uniqueness", %{tenant: tenant} do
test "pid is associated to the same pid for a given tenant and guarantees uniqueness" do
tenant = tenant_fixture()

assert {:ok, pid} = ReplicationConnection.start(tenant, self())
assert {:ok, ^pid} = ReplicationConnection.start(tenant, self())
end
Expand All @@ -119,62 +121,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
ReplicationConnection.start(tenant2, self())
end

defmodule TestHandler do
@behaviour PostgresReplication.Handler
import PostgresReplication.Protocol
alias PostgresReplication.Protocol.KeepAlive

@impl true
def call(message, _metadata) when is_write(message) do
:noreply
end

def call(message, _metadata) when is_keep_alive(message) do
reply =
case parse(message) do
%KeepAlive{reply: :now, wal_end: wal_end} ->
wal_end = wal_end + 1
standby(wal_end, wal_end, wal_end, :now)

_ ->
hold()
end

{:reply, reply}
end

def call(_, _), do: :noreply
end

test "handle standby connections exceeds max_wal_senders", %{tenant: tenant} do
opts = Database.from_tenant(tenant, "realtime_test", :stop) |> Database.opts()

# This creates a loop of errors that occupies all WAL senders and lets us test the error handling
pids =
for i <- 0..4 do
replication_slot_opts =
%PostgresReplication{
connection_opts: opts,
table: :all,
output_plugin: "pgoutput",
output_plugin_options: [],
handler_module: TestHandler,
publication_name: "test_#{i}_publication",
replication_slot_name: "test_#{i}_slot"
}

{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
pid
end

on_exit(fn ->
Enum.each(pids, &Process.exit(&1, :kill))
Process.sleep(2000)
end)

assert {:error, :max_wal_senders_reached} = ReplicationConnection.start(tenant, self())
end

describe "whereis/1" do
test "returns pid if exists" do
tenant = tenant_fixture()
Expand Down

0 comments on commit 2f9e783

Please sign in to comment.