Skip to content

Commit

Permalink
Merge pull request #157 from supabase/max-replication-lag-exceeded
Browse files Browse the repository at this point in the history
fix: drop replication slot when lag exceeds max replication lag value
  • Loading branch information
w3b6x9 authored Jun 1, 2021
2 parents 369cb3e + 28d8378 commit 7b2a3e1
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 50 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ PORT # {number} Port which you can connect your client/l
SECURE_CHANNELS # {string} (options: 'true'/'false') Enable/Disable channels authorization via JWT verification.
JWT_SECRET # {string} HS algorithm octet key (e.g. "95x0oR8jq9unl9pOIx"). Only required if SECURE_CHANNELS is set to true.
JWT_CLAIM_VALIDATORS # {string} Expected claim key/value pairs compared to JWT claims via equality checks in order to validate JWT. e.g. '{"iss": "Issuer", "nbf": 1610078130}'. This is optional but encouraged.
MAX_REPLICATION_LAG_MB # {number} If set, when the replication lag exceeds MAX_REPLICATION_LAG_MB (value must be a positive integer in megabytes), then replication slot is dropped, Realtime is restarted, and a new slot is created. Warning: setting MAX_REPLICATION_SLOT_MB could cause database changes to be lost when the replication slot is dropped.
```

**EXAMPLE: RUNNING SERVER WITH ALL OPTIONS**
Expand All @@ -216,6 +217,7 @@ docker run \
-e SECURE_CHANNELS='true' \
-e JWT_SECRET='SOMETHING_SUPER_SECRET' \
-e JWT_CLAIM_VALIDATORS='{"iss": "Issuer", "nbf": 1610078130}' \
-e MAX_REPLICATION_LAG_MB=1000 \
-p 4000:4000 \
supabase/realtime
```
Expand Down
8 changes: 7 additions & 1 deletion server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ publications = System.get_env("PUBLICATIONS", "[\"supabase_realtime\"]")
slot_name = System.get_env("SLOT_NAME") || :temporary
configuration_file = System.get_env("CONFIGURATION_FILE") || nil

# If the replication lag exceeds the set MAX_REPLICATION_LAG_MB (make sure the value is a positive integer in megabytes) value
# then replication slot named SLOT_NAME (e.g. "realtime") will be dropped and Realtime will
# restart with a new slot.
max_replication_lag_in_mb = String.to_integer(System.get_env("MAX_REPLICATION_LAG_MB", "0"))

# Channels are not secured by default in development and
# are secured by default in production.
secure_channels = System.get_env("SECURE_CHANNELS", "true") != "false"
Expand Down Expand Up @@ -67,7 +72,8 @@ config :realtime,
configuration_file: configuration_file,
secure_channels: secure_channels,
jwt_secret: jwt_secret,
jwt_claim_validators: jwt_claim_validators
jwt_claim_validators: jwt_claim_validators,
max_replication_lag_in_mb: max_replication_lag_in_mb

# Configures the endpoint
config :realtime, RealtimeWeb.Endpoint,
Expand Down
8 changes: 7 additions & 1 deletion server/config/releases.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ publications = System.get_env("PUBLICATIONS", "[\"supabase_realtime\"]")
slot_name = System.get_env("SLOT_NAME") || :temporary
configuration_file = System.get_env("CONFIGURATION_FILE")

# If the replication lag exceeds the set MAX_REPLICATION_LAG_MB (make sure the value is a positive integer in megabytes) value
# then replication slot named SLOT_NAME (e.g. "realtime") will be dropped and Realtime will
# restart with a new slot.
max_replication_lag_in_mb = String.to_integer(System.get_env("MAX_REPLICATION_LAG_MB", "0"))

# Channels are not secured by default in development and
# are secured by default in production.
secure_channels = System.get_env("SECURE_CHANNELS", "true") != "false"
Expand Down Expand Up @@ -60,7 +65,8 @@ config :realtime,
configuration_file: configuration_file,
secure_channels: secure_channels,
jwt_secret: jwt_secret,
jwt_claim_validators: jwt_claim_validators
jwt_claim_validators: jwt_claim_validators,
max_replication_lag_in_mb: max_replication_lag_in_mb

config :realtime, RealtimeWeb.Endpoint,
http: [:inet6, port: app_port],
Expand Down
141 changes: 94 additions & 47 deletions server/lib/adapters/postgres/epgsql_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
epgsql_params: nil,
delays: [0],
publication_name: nil,
replication_epgsql_pid: nil,
select_epgsql_pid: nil,
epgsql_replication_pid: nil,
epgsql_select_pid: nil,
slot_config: nil,
wal_position: nil
wal_position: nil,
max_replication_lag_in_mb: 0
)
)

Expand Down Expand Up @@ -42,16 +43,18 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
epgsql_params: epgsql_params,
publications: publications,
slot_name: slot_name,
wal_position: {xlog, offset} = wal_position
wal_position: {xlog, offset} = wal_position,
max_replication_lag_in_mb: max_replication_lag_in_mb
)
when is_map(epgsql_params) and is_list(publications) and
(is_binary(slot_name) or is_atom(slot_name)) and is_binary(xlog) and
is_binary(offset) do
is_binary(offset) and is_number(max_replication_lag_in_mb) do
Process.flag(:trap_exit, true)

state = %State{
epgsql_params: epgsql_params,
wal_position: wal_position
wal_position: wal_position,
max_replication_lag_in_mb: max_replication_lag_in_mb
}

with publication_name when is_binary(publication_name) <-
Expand Down Expand Up @@ -82,39 +85,49 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
Enum.map([epgsql_replication_config, epgsql_select_config], fn epgsql_config ->
case :epgsql.connect(epgsql_config) do
{:ok, epgsql_pid} -> epgsql_pid
{:error, error} -> error
{:error, _} -> nil
end
end)

[replication_epgsql_pid, select_epgsql_pid] = epgsql_pids

updated_state = %{
state
| replication_epgsql_pid: replication_epgsql_pid,
select_epgsql_pid: select_epgsql_pid
}
[epgsql_replication_pid, epgsql_select_pid] = epgsql_pids

with true <- Enum.all?(epgsql_pids, &is_pid(&1)),
updated_state <- %{
state
| epgsql_replication_pid: epgsql_replication_pid,
epgsql_select_pid: epgsql_select_pid
},
:ok <- check_replication_lag_size(updated_state),
{:ok, updated_state} <- start_replication(updated_state) do
{:noreply, updated_state}
else
{:error, :replication_lag_exceeds_set_limit} = error ->
is_pid(epgsql_replication_pid) && :epgsql.close(epgsql_replication_pid)

is_pid(epgsql_select_pid) &&
maybe_drop_replication_slot(%{state | epgsql_select_pid: epgsql_select_pid}) &&
:epgsql.close(epgsql_select_pid)

{:stop, error, state}

error ->
:ok = Enum.each(epgsql_pids, &(is_pid(&1) && :epgsql.close(&1)))
{:stop, error, updated_state}
Enum.each(epgsql_pids, &(is_pid(&1) && :epgsql.close(&1)))

{:stop, error, state}
end
end

@impl true
def handle_call(
{:ack_lsn, {xlog, offset}},
_from,
%{replication_epgsql_pid: replication_epgsql_pid} = state
%{epgsql_replication_pid: epgsql_replication_pid} = state
)
when is_integer(xlog) and is_integer(offset) do
with <<last_processed_lsn::integer-64>> <- <<xlog::integer-32, offset::integer-32>>,
:ok <-
:epgsql.standby_status_update(
replication_epgsql_pid,
epgsql_replication_pid,
last_processed_lsn,
last_processed_lsn
) do
Expand All @@ -131,17 +144,17 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
def handle_info(
:start_replication,
%State{
replication_epgsql_pid: replication_epgsql_pid,
select_epgsql_pid: select_epgsql_pid
epgsql_replication_pid: epgsql_replication_pid,
epgsql_select_pid: epgsql_select_pid
} = state
) do
case start_replication(state) do
{:ok, updated_state} ->
{:noreply, updated_state}

{:error, error} ->
:ok = :epgsql.close(replication_epgsql_pid)
:ok = :epgsql.close(select_epgsql_pid)
:epgsql.close(epgsql_replication_pid)
:epgsql.close(epgsql_select_pid)
{:stop, error, state}
end
end
Expand All @@ -159,13 +172,13 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
where: _where_msg
]}}} = msg,
%{
replication_epgsql_pid: replication_epgsql_pid,
select_epgsql_pid: select_epgsql_pid
epgsql_replication_pid: epgsql_replication_pid,
epgsql_select_pid: epgsql_select_pid
} = state
) do
:ok = :epgsql.close(replication_epgsql_pid)
:ok = maybe_drop_replication_slot(state)
:ok = :epgsql.close(select_epgsql_pid)
:epgsql.close(epgsql_replication_pid)
maybe_drop_replication_slot(state)
:epgsql.close(epgsql_select_pid)

{:stop, msg, state}
end
Expand All @@ -191,24 +204,24 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
{:error, :error, "58P01", :undefined_file, error_msg,
[file: "walsender.c", line: _line, routine: "XLogRead", severity: "ERROR"]}}} = msg,
%{
replication_epgsql_pid: replication_epgsql_pid,
select_epgsql_pid: select_epgsql_pid
epgsql_replication_pid: epgsql_replication_pid,
epgsql_select_pid: epgsql_select_pid
} = state
)
when is_binary(error_msg) do
:ok = :epgsql.close(replication_epgsql_pid)
:epgsql.close(epgsql_replication_pid)

stop_msg =
case String.split(error_msg) do
["requested", "WAL", "segment", _, "has", "already", "been", "removed"] ->
:ok = maybe_drop_replication_slot(state)
maybe_drop_replication_slot(state)
{:error, {error_msg, :replication_slot_dropped}}

_ ->
msg
end

:ok = :epgsql.close(select_epgsql_pid)
:epgsql.close(epgsql_select_pid)

{:stop, stop_msg, state}
end
Expand All @@ -217,15 +230,49 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
def handle_info(
msg,
%{
replication_epgsql_pid: replication_epgsql_pid,
select_epgsql_pid: select_epgsql_pid
epgsql_replication_pid: epgsql_replication_pid,
epgsql_select_pid: epgsql_select_pid
} = state
) do
:ok = :epgsql.close(replication_epgsql_pid)
:ok = :epgsql.close(select_epgsql_pid)
:epgsql.close(epgsql_replication_pid)
:epgsql.close(epgsql_select_pid)
{:stop, msg, state}
end

defp check_replication_lag_size(%State{
epgsql_select_pid: epgsql_select_pid,
max_replication_lag_in_mb: max_replication_lag_in_mb,
slot_config: {expected_slot_name, _}
})
when is_pid(epgsql_select_pid) and max_replication_lag_in_mb > 0 do
case :epgsql.squery(
epgsql_select_pid,
"SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) FROM pg_replication_slots"
) do
{:ok, _, results} ->
case Enum.find(results, fn {slot_name, _} -> slot_name == expected_slot_name end) do
nil ->
:ok

{_, lag_in_bytes} ->
if String.to_integer(lag_in_bytes) / 1_000_000 <= max_replication_lag_in_mb do
:ok
else
{:error, :replication_lag_exceeds_set_limit}
end
end

{:error, error} ->
{:error, error}
end
end

defp check_replication_lag_size(%State{max_replication_lag_in_mb: max_replication_lag_in_mb})
when max_replication_lag_in_mb == 0,
do: :ok

defp check_replication_lag_size(_), do: {:error, :check_replication_lag_size_error}

defp generate_publication_name(publications) when is_list(publications) do
with true <- Enum.all?(publications, fn pub -> is_binary(pub) end),
publication_name when publication_name != "" <-
Expand Down Expand Up @@ -263,7 +310,7 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
defp start_replication(
%State{
publication_name: publication_name,
replication_epgsql_pid: replication_epgsql_pid,
epgsql_replication_pid: epgsql_replication_pid,
slot_config: {slot_name, _command},
wal_position: {xlog, offset}
} = state
Expand All @@ -275,7 +322,7 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
Process.whereis(Replication),
:ok <-
:epgsql.start_replication(
replication_epgsql_pid,
epgsql_replication_pid,
slot_name,
replication_server_pid,
[],
Expand All @@ -288,7 +335,7 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
end

false ->
:ok = maybe_drop_replication_slot(state)
maybe_drop_replication_slot(state)
{delay, updated_state} = get_delay(state)
Process.send_after(__MODULE__, :start_replication, delay)
{:ok, updated_state}
Expand Down Expand Up @@ -318,7 +365,7 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do

defp maybe_create_replication_slot(
%State{
replication_epgsql_pid: replication_epgsql_pid,
epgsql_replication_pid: epgsql_replication_pid,
slot_config: {_slot_name, create_replication_command}
} = state
) do
Expand All @@ -327,7 +374,7 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
:ok

false ->
case :epgsql.squery(replication_epgsql_pid, create_replication_command) do
case :epgsql.squery(epgsql_replication_pid, create_replication_command) do
{:ok, _, _} -> :ok
{:error, error} -> {:error, error}
end
Expand All @@ -338,42 +385,42 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do
end

defp maybe_drop_replication_slot(%State{
select_epgsql_pid: select_epgsql_pid,
epgsql_select_pid: epgsql_select_pid,
slot_config: {slot_name, _command}
}) do
drop_replication_slot_command =
["SELECT pg_drop_replication_slot('", slot_name, "')"] |> IO.iodata_to_binary()

case :epgsql.squery(select_epgsql_pid, drop_replication_slot_command) do
case :epgsql.squery(epgsql_select_pid, drop_replication_slot_command) do
{:ok, _, _} -> :ok
{:error, _error} -> :ok
end
end

defp does_publication_exist(%State{
publication_name: publication_name,
select_epgsql_pid: select_epgsql_pid
epgsql_select_pid: epgsql_select_pid
}) do
publication_query =
["SELECT COUNT(*) = 1 FROM pg_publication WHERE pubname = '", publication_name, "'"]
|> IO.iodata_to_binary()

case :epgsql.squery(select_epgsql_pid, publication_query) do
case :epgsql.squery(epgsql_select_pid, publication_query) do
{:ok, _, [{"t"}]} -> true
{:ok, _, [{"f"}]} -> false
{:error, error} -> {:error, error}
end
end

defp does_replication_slot_exist(%State{
select_epgsql_pid: select_epgsql_pid,
epgsql_select_pid: epgsql_select_pid,
slot_config: {slot_name, _command}
}) do
replication_slot_query =
["SELECT COUNT(*) >= 1 FROM pg_replication_slots WHERE slot_name = '", slot_name, "'"]
|> IO.iodata_to_binary()

case :epgsql.squery(select_epgsql_pid, replication_slot_query) do
case :epgsql.squery(epgsql_select_pid, replication_slot_query) do
{:ok, _, [{"t"}]} -> true
{:ok, _, [{"f"}]} -> false
{:error, error} -> {:error, error}
Expand Down
5 changes: 4 additions & 1 deletion server/lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule Realtime.Application do
# `select * from pg_replication_slots`
slot_name = Application.get_env(:realtime, :slot_name)

max_replication_lag_in_mb = Application.fetch_env!(:realtime, :max_replication_lag_in_mb)

publications = Application.get_env(:realtime, :publications) |> Jason.decode!()

epgsql_params = %{
Expand Down Expand Up @@ -78,7 +80,8 @@ defmodule Realtime.Application do
epgsql_params: epgsql_params,
publications: publications,
slot_name: slot_name,
wal_position: {"0", "0"}
wal_position: {"0", "0"},
max_replication_lag_in_mb: max_replication_lag_in_mb
}
]

Expand Down

0 comments on commit 7b2a3e1

Please sign in to comment.