From 2bff2ef4ce6fcea9f113fa36011021ee42a574dc Mon Sep 17 00:00:00 2001 From: Wen Bo Xie Date: Fri, 21 May 2021 00:06:12 -0400 Subject: [PATCH] fix: drop replication slot when db deletes wal segment --- server/lib/adapters/postgres/epgsql_server.ex | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/server/lib/adapters/postgres/epgsql_server.ex b/server/lib/adapters/postgres/epgsql_server.ex index 116376d32..2818e7a14 100644 --- a/server/lib/adapters/postgres/epgsql_server.ex +++ b/server/lib/adapters/postgres/epgsql_server.ex @@ -170,6 +170,49 @@ defmodule Realtime.Adapters.Postgres.EpgsqlServer do {:stop, msg, state} end + @doc """ + + Removes the existing replication slot when epgsql replication process crashes due to + database deleting WAL segment when Realtime server has fallen too far behind. + + ## Example process exit message + + {:EXIT, #PID<0.2324.0>, + {:error, + {:error, :error, "58P01", :undefined_file, + "requested WAL segment 00000001000000000000007F has already been removed", + [file: "walsender.c", line: "2447", routine: "XLogRead", severity: "ERROR"]}}} + + """ + @impl true + def handle_info( + {:EXIT, _pid, + {:error, + {:error, :error, "58P01", :undefined_file, error_msg, + [file: "walsender.c", line: _line, routine: "XLogRead", severity: "ERROR"]}}} = msg, + %{ + replication_epgsql_pid: replication_epgsql_pid, + select_epgsql_pid: select_epgsql_pid + } = state + ) + when is_binary(error_msg) do + :ok = :epgsql.close(replication_epgsql_pid) + + stop_msg = + case String.split(error_msg) do + ["requested", "WAL", "segment", _, "has", "already", "been", "removed"] -> + :ok = maybe_drop_replication_slot(state) + {:error, {error_msg, :replication_slot_dropped}} + + _ -> + msg + end + + :ok = :epgsql.close(select_epgsql_pid) + + {:stop, stop_msg, state} + end + @impl true def handle_info( msg,