Skip to content

Commit

Permalink
fix: works when permanent slot already exists
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Nov 22, 2021
1 parent ff6c823 commit 268518c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 56 deletions.
6 changes: 5 additions & 1 deletion server/lib/realtime/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ defmodule Realtime.MessageDispatcher do
@doc """
Hook invoked by Phoenix.PubSub dispatch.
"""
@spec dispatch([{pid, nil | {:fastlane | :user_fastlane, pid, atom, binary}}], pid, Phoenix.Socket.Broadcast.t()) :: :ok
@spec dispatch(
[{pid, nil | {:fastlane | :user_fastlane, pid, atom, binary}}],
pid,
Phoenix.Socket.Broadcast.t()
) :: :ok
def dispatch(subscribers, _from, %Broadcast{payload: payload} = msg) do
{is_rls_enabled, new_payload} = Map.pop(payload, :is_rls_enabled)
{users, new_payload} = Map.pop(new_payload, :users)
Expand Down
86 changes: 43 additions & 43 deletions server/lib/realtime/rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,20 @@ defmodule Realtime.RLS.ReplicationPoller do
end

def generate_record([
{"wal",
%{
"type" => "INSERT" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"record" => record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
{"wal",
%{
"type" => "INSERT" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"record" => record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
%NewRecord{
columns: columns,
commit_timestamp: commit_timestamp,
Expand All @@ -151,21 +151,21 @@ defmodule Realtime.RLS.ReplicationPoller do
end

def generate_record([
{"wal",
%{
"type" => "UPDATE" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"record" => record,
"old_record" => old_record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
{"wal",
%{
"type" => "UPDATE" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"record" => record,
"old_record" => old_record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
%UpdatedRecord{
columns: columns,
commit_timestamp: commit_timestamp,
Expand All @@ -180,20 +180,20 @@ defmodule Realtime.RLS.ReplicationPoller do
end

def generate_record([
{"wal",
%{
"type" => "DELETE" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"old_record" => old_record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
{"wal",
%{
"type" => "DELETE" = type,
"columns" => columns,
"commit_timestamp" => commit_timestamp,
"schema" => schema,
"table" => table,
"old_record" => old_record
}},
{"is_rls_enabled", is_rls_enabled},
{"users", users},
{"errors", _errors}
])
when is_boolean(is_rls_enabled) and is_list(users) do
%DeletedRecord{
columns: columns,
commit_timestamp: commit_timestamp,
Expand Down
21 changes: 11 additions & 10 deletions server/lib/realtime/rls/replications.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@ defmodule Realtime.RLS.Replications do
Multi.new()
|> Multi.run(:create_slot, fn _, _ ->
query(
"select 1 from pg_create_logical_replication_slot($1, 'wal2json', $2);",
"select
case when not exists (
select 1
from pg_replication_slots
where slot_name = $1
)
then (
select 1 from pg_create_logical_replication_slot($1, 'wal2json', $2)
)
else 1
end;",
[slot_name, temporary_slot]
)
|> case do
{:ok, %Postgrex.Result{rows: [[1]]}} ->
{:ok, slot_name}

{:error,
%Postgrex.Error{
postgres: %{
code: :duplicate_object,
routine: "ReplicationSlotCreate"
}
}} ->
{:ok, slot_name}

{_, error} ->
{:error, error}
end
Expand Down
1 change: 0 additions & 1 deletion server/lib/realtime/subscribers_notification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,4 @@ defmodule Realtime.SubscribersNotification do
end

defp is_valid_notification_key(_v), do: false

end
2 changes: 1 addition & 1 deletion server/test/realtime/rls_replications_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Realtime.RlsReplicationsTest do

test "prepare_replication/2, try to create a slot with an existing name" do
assert match?(
{:error, %Postgrex.Error{}},
{:ok, %{create_slot: @slot_name, search_path: :set}},
prepare_replication(@slot_name, false)
)
end
Expand Down

0 comments on commit 268518c

Please sign in to comment.